1use crate::event::Event;
2#[cfg(test)]
3use crate::test_utils::Utc;
4use chrono::DateTime;
5#[cfg(not(test))]
6use chrono::Utc;
7use serde::Serialize;
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::net::IpAddr;
10use std::time::Duration;
11use thiserror::Error;
12use tokio::sync::broadcast::{self, error::RecvError};
13use tokio::{select, time};
14use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
15use tracing::{debug, warn};
16
17#[cfg(feature = "health-monitor-telegram")]
18pub mod telegram;
19
20#[derive(Debug, Serialize, Clone, Copy)]
22#[serde(rename_all = "camelCase")]
23pub enum ClientEventKind {
24 Connect,
25 Disconnect,
26 ConnectionLoss,
27}
28
29#[derive(Debug, Serialize)]
31#[serde(rename_all = "camelCase")]
32#[cfg_attr(test, derive(Clone))]
33pub struct ClientEvent {
34 pub kind: ClientEventKind,
36 pub conn_id: usize,
38 pub timestamp: DateTime<chrono::Utc>,
40}
41
42#[derive(Debug, Serialize, Ord, PartialOrd, Eq, PartialEq, Clone, Copy)]
44#[serde(rename_all = "camelCase")]
45pub enum Health {
46 Healthy,
47 Unstable,
48 Offline,
49}
50
51#[derive(Debug, Serialize)]
53#[serde(rename_all = "camelCase")]
54#[cfg_attr(test, derive(Clone))]
55pub struct ClientStatus {
56 pub health: Health,
58 pub since: DateTime<chrono::Utc>,
60 pub local_ip: Option<IpAddr>,
62 pub events: VecDeque<ClientEvent>,
64}
65
66impl Default for ClientStatus {
67 fn default() -> Self {
68 Self {
69 health: Health::Offline,
70 since: Utc::now(),
71 local_ip: None,
72 events: VecDeque::new(),
73 }
74 }
75}
76
77#[derive(Debug, Serialize)]
79#[cfg_attr(test, derive(Clone))]
80pub struct HealthReport {
81 pub clients: HashMap<String, ClientStatus>,
82}
83
84impl HealthReport {
85 fn new(client_ids: HashSet<String>) -> Self {
86 Self {
87 clients: client_ids
88 .into_iter()
89 .map(|client_id| (client_id, ClientStatus::default()))
90 .collect(),
91 }
92 }
93
94 pub fn as_text_report(&self) -> String {
99 let mut clients = self
100 .clients
101 .iter()
102 .collect::<Vec<(&String, &ClientStatus)>>();
103
104 clients.sort_by(|&a, &b| {
105 if a.1.health == b.1.health {
106 return a.0.cmp(b.0);
107 }
108
109 a.1.health.cmp(&b.1.health)
110 });
111
112 let now = Utc::now();
113
114 clients
115 .iter()
116 .map(|(client_id, status)| {
117 let mut text = String::new();
118 let local_ip = match status.local_ip {
119 Some(local_ip) => local_ip.to_string(),
120 None => "unknown".to_string(),
121 };
122 let (emoji, label) = match status.health {
123 Health::Healthy => ('🟢', "Healthy"),
124 Health::Unstable => ('🔴', "Unhealthy"),
125 Health::Offline => ('âš«', "Offline"),
126 };
127
128 let status_duration = (now - status.since).num_minutes();
129 text.push_str(&format!(
130 "{emoji} {client_id} ({local_ip}): {label} for {status_duration} minutes"
131 ));
132
133 text
134 })
135 .collect::<Vec<String>>()
136 .join("\n")
137 }
138}
139
140pub trait HealthReportSender {
142 type Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static;
143
144 fn send(
151 &mut self,
152 report: &HealthReport,
153 silent: bool,
154 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
155}
156
157#[derive(Debug, Error)]
158pub enum BuilderError {
159 #[error("missing field: {0}")]
160 MissingField(&'static str),
161
162 #[error("invalid value: {0}")]
163 InvalidValue(&'static str),
164}
165
166#[derive(Debug, Default)]
206pub struct HealthMonitorBuilder<T: HealthReportSender> {
207 sender: Option<T>,
208 event_rx: Option<broadcast::Receiver<Event>>,
209 client_ids: HashSet<String>,
210 max_client_events: usize,
211 connection_loss_limit: usize,
212 check_interval: Duration,
213 reminder_interval: Duration,
214 offline_grace_period: Duration,
215 connection_loss_window: Duration,
216 recovery_grace_period: Duration,
217}
218
219impl<T: HealthReportSender> HealthMonitorBuilder<T> {
220 pub fn new() -> Self {
225 Self {
226 sender: None,
227 event_rx: None,
228 client_ids: HashSet::new(),
229 max_client_events: 50,
230 connection_loss_limit: 3,
231 check_interval: Duration::from_secs(30),
232 reminder_interval: Duration::from_secs(1800),
233 offline_grace_period: Duration::from_secs(120),
234 connection_loss_window: Duration::from_secs(600),
235 recovery_grace_period: Duration::from_secs(1200),
236 }
237 }
238
239 pub fn sender(mut self, sender: T) -> Self {
244 self.sender = Some(sender);
245 self
246 }
247
248 pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
253 self.event_rx = Some(event_rx);
254 self
255 }
256
257 pub fn client_ids(mut self, client_ids: HashSet<String>) -> Self {
261 self.client_ids = client_ids;
262 self
263 }
264
265 pub fn max_client_events(mut self, max_client_events: usize) -> Self {
269 self.max_client_events = max_client_events;
270 self
271 }
272
273 pub fn connection_loss_limit(mut self, connection_loss_limit: usize) -> Self {
278 self.connection_loss_limit = connection_loss_limit;
279 self
280 }
281
282 pub fn check_interval(mut self, check_interval: Duration) -> Self {
286 self.check_interval = check_interval;
287 self
288 }
289
290 pub fn reminder_interval(mut self, reminder_interval: Duration) -> Self {
295 self.reminder_interval = reminder_interval;
296 self
297 }
298
299 pub fn offline_grace_period(mut self, offline_grace_period: Duration) -> Self {
304 self.offline_grace_period = offline_grace_period;
305 self
306 }
307
308 pub fn connection_loss_window(mut self, connection_loss_window: Duration) -> Self {
312 self.connection_loss_window = connection_loss_window;
313 self
314 }
315
316 pub fn recovery_grace_period(mut self, recovery_grace_period: Duration) -> Self {
321 self.recovery_grace_period = recovery_grace_period;
322 self
323 }
324
325 pub fn build(self) -> Result<HealthMonitor<T>, BuilderError> {
330 let sender = self.sender.ok_or(BuilderError::MissingField("sender"))?;
331 let event_rx = self.event_rx.ok_or(BuilderError::MissingField("event_r"))?;
332
333 if self.max_client_events < 2 {
334 return Err(BuilderError::InvalidValue(
335 "max_client_events must be at least 2",
336 ));
337 }
338
339 if self.connection_loss_limit < 1 {
340 return Err(BuilderError::InvalidValue(
341 "connection_loss_limit must be at least 1",
342 ));
343 }
344
345 Ok(HealthMonitor {
346 sender,
347 event_rx,
348 report: HealthReport::new(self.client_ids),
349 max_client_events: self.max_client_events,
350 connection_loss_limit: self.connection_loss_limit,
351 check_interval: self.check_interval,
352 reminder_interval: self.reminder_interval,
353 offline_grace_period: self.offline_grace_period,
354 connection_loss_window: self.connection_loss_window,
355 recovery_grace_period: self.recovery_grace_period,
356 #[cfg(test)]
357 test_notify_event_processed: std::sync::Arc::new(tokio::sync::Notify::new()),
358 })
359 }
360}
361
362#[derive(Debug)]
369pub struct HealthMonitor<T: HealthReportSender> {
370 sender: T,
371 event_rx: broadcast::Receiver<Event>,
372 report: HealthReport,
373 max_client_events: usize,
374 connection_loss_limit: usize,
375 check_interval: Duration,
376 reminder_interval: Duration,
377 offline_grace_period: Duration,
378 connection_loss_window: Duration,
379 recovery_grace_period: Duration,
380 #[cfg(test)]
381 pub test_notify_event_processed: std::sync::Arc<tokio::sync::Notify>,
382}
383
384impl<T: HealthReportSender> HealthMonitor<T> {
385 pub async fn listen(&mut self) {
402 let _ = self.sender.send(&self.report, false).await;
403 let mut check_interval = time::interval(self.check_interval);
404 let mut reminder_interval = time::interval(self.reminder_interval);
405
406 check_interval.tick().await;
407 reminder_interval.tick().await;
408
409 loop {
410 let should_continue = select! {
411 _ = check_interval.tick() => {
412 self.check_clients().await;
413 true
414 }
415 _ = reminder_interval.tick() => {
416 self.send_reminder().await;
417 true
418 }
419 result = self.event_rx.recv() => {
420 self.handle_event_recv(result).await
421 }
422 };
423
424 if !should_continue {
425 break;
426 }
427 }
428 }
429
430 async fn check_clients(&mut self) {
431 let now = Utc::now();
432 let offline_cutoff = now - self.offline_grace_period;
433 let connection_loss_cutoff = now - self.connection_loss_window;
434 let recovery_cutoff = now - self.recovery_grace_period;
435
436 let mut silent_updates: usize = 0;
437 let mut alert_updates: usize = 0;
438
439 for client in self.report.clients.values_mut() {
440 if matches!(client.health, Health::Offline) {
441 continue;
442 }
443
444 let mut recent_events: Vec<_> = client.events.iter().take(2).collect();
445 recent_events.sort_by(|a, b| b.conn_id.cmp(&a.conn_id));
446
447 let Some(latest_event) = recent_events.first() else {
448 continue;
449 };
450
451 match (client.health, latest_event.kind) {
452 (Health::Healthy | Health::Unstable, ClientEventKind::ConnectionLoss)
453 if latest_event.timestamp <= offline_cutoff =>
454 {
455 client.health = Health::Offline;
456 client.since = now;
457 alert_updates += 1;
458 }
459 (Health::Unstable, ClientEventKind::Connect)
460 if latest_event.timestamp <= recovery_cutoff =>
461 {
462 client.health = Health::Healthy;
463 client.since = now;
464 silent_updates += 1;
465 }
466 (Health::Healthy, _) => {
467 let mut connection_loss_count: usize = 0;
468
469 for event in client.events.iter() {
470 if event.timestamp < connection_loss_cutoff {
471 break;
472 }
473
474 if matches!(event.kind, ClientEventKind::ConnectionLoss) {
475 connection_loss_count += 1;
476 }
477 }
478
479 if connection_loss_count >= self.connection_loss_limit {
480 client.health = Health::Unstable;
481 client.since = now;
482 alert_updates += 1;
483 }
484 }
485 (_, _) => {}
486 }
487 }
488
489 if alert_updates > 0 {
490 self.send_report(false).await;
491 } else if silent_updates > 0 {
492 self.send_report(true).await;
493 }
494 }
495
496 async fn send_reminder(&mut self) {
497 if self
498 .report
499 .clients
500 .values()
501 .any(|client| !matches!(client.health, Health::Healthy))
502 {
503 self.send_report(false).await;
504 }
505 }
506
507 async fn handle_event_recv(&mut self, result: Result<Event, RecvError>) -> bool {
508 match result {
509 Ok(Event::ClientConnect {
510 client_id,
511 local_ip,
512 conn_id,
513 }) => {
514 let client = self.report.clients.entry(client_id).or_default();
515 client.local_ip = Some(local_ip);
516 let mut should_send_report = false;
517
518 if matches!(client.health, Health::Offline) {
519 client.health = Health::Healthy;
520 client.since = Utc::now();
521 should_send_report = true;
522 }
523
524 client.events.push_front(ClientEvent {
525 kind: ClientEventKind::Connect,
526 conn_id,
527 timestamp: Utc::now(),
528 });
529 println!("connect");
530 if client.events.len() > self.max_client_events {
531 client.events.pop_back();
532 }
533
534 if should_send_report {
535 self.send_report(true).await;
536 }
537
538 #[cfg(test)]
539 self.test_notify_event_processed.notify_one();
540
541 true
542 }
543 Ok(Event::ClientDisconnect { client_id, conn_id }) => {
544 let client = self.report.clients.entry(client_id).or_default();
545 client.health = Health::Offline;
546 client.since = Utc::now();
547 client.events.push_front(ClientEvent {
548 kind: ClientEventKind::Disconnect,
549 conn_id,
550 timestamp: Utc::now(),
551 });
552
553 if client.events.len() > self.max_client_events {
554 client.events.pop_back();
555 }
556
557 #[cfg(test)]
558 self.test_notify_event_processed.notify_one();
559
560 self.send_report(true).await;
561 true
562 }
563 Ok(Event::ClientConnectionLoss { client_id, conn_id }) => {
564 let client = self.report.clients.entry(client_id).or_default();
565 client.events.push_front(ClientEvent {
566 kind: ClientEventKind::ConnectionLoss,
567 conn_id,
568 timestamp: Utc::now(),
569 });
570
571 if client.events.len() > self.max_client_events {
572 client.events.pop_back();
573 }
574
575 #[cfg(test)]
576 self.test_notify_event_processed.notify_one();
577
578 true
579 }
580 Ok(_) => true,
581 Err(RecvError::Lagged(n)) => {
582 warn!("HealthMonitor lagged by {n} messages, some events were missed");
583 true
584 }
585 Err(RecvError::Closed) => {
586 debug!("HealthMonitor event stream closed, exiting event loop");
587 false
588 }
589 }
590 }
591
592 async fn send_report(&mut self, silent: bool) {
593 if let Err(err) = self.sender.send(&self.report, silent).await {
594 warn!("Failed to send health report: {:?}", err);
595 }
596 }
597}
598
599#[cfg(feature = "tokio-graceful-shutdown")]
600#[derive(Debug, Error)]
601pub enum NeverError {}
602
603#[cfg(feature = "tokio-graceful-shutdown")]
604impl<T: HealthReportSender + Send + Sync + 'static> IntoSubsystem<NeverError> for HealthMonitor<T> {
605 async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
606 let _ = self.listen().cancel_on_shutdown(subsys).await;
607
608 Ok(())
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615 use crate::event::Event;
616 use ntest::timeout;
617 use std::convert::Infallible;
618 use std::sync::{Arc, Mutex};
619 use std::time::Duration;
620 use tokio::sync::broadcast;
621 use tokio::time::advance;
622
623 #[derive(Default, Clone)]
624 struct MockSender {
625 last: Arc<Mutex<Option<(HealthReport, bool)>>>,
626 }
627
628 impl HealthReportSender for MockSender {
629 type Error = Infallible;
630
631 async fn send(&mut self, report: &HealthReport, silent: bool) -> Result<(), Self::Error> {
632 let mut last = self.last.lock().unwrap();
633 *last = Some((report.clone(), silent));
634 Ok(())
635 }
636 }
637
638 async fn wait_for_report(sender: &MockSender) -> (HealthReport, bool) {
639 loop {
640 if let Some(report) = sender.last.lock().unwrap().take() {
641 return report;
642 }
643
644 tokio::task::yield_now().await;
645 }
646 }
647
648 #[tokio::test(start_paused = true)]
649 #[timeout(1000)]
650 async fn sends_initial_report_on_start() {
651 let (_tx, rx) = broadcast::channel(16);
652 let sender = MockSender::default();
653
654 let mut monitor = HealthMonitorBuilder::new()
655 .sender(sender.clone())
656 .event_rx(rx)
657 .client_ids(Default::default())
658 .build()
659 .unwrap();
660
661 let handle = tokio::spawn(async move {
662 monitor.listen().await;
663 });
664
665 let (_, silent) = wait_for_report(&sender).await;
666 assert!(!silent, "Initial report should not be silent");
667
668 drop(handle);
669 }
670
671 #[tokio::test(start_paused = true)]
672 #[timeout(1000)]
673 async fn client_connect_sets_healthy() {
674 let (tx, rx) = broadcast::channel(16);
675 let sender = MockSender::default();
676
677 let mut monitor = HealthMonitorBuilder::new()
678 .sender(sender.clone())
679 .event_rx(rx)
680 .client_ids(["client1".to_string()].into_iter().collect())
681 .build()
682 .unwrap();
683 let notify = monitor.test_notify_event_processed.clone();
684
685 let handle = tokio::spawn(async move {
686 monitor.listen().await;
687 });
688 wait_for_report(&sender).await;
689
690 tx.send(Event::ClientConnect {
691 client_id: "client1".to_string(),
692 local_ip: "127.0.0.1".parse().unwrap(),
693 conn_id: 1,
694 })
695 .unwrap();
696 notify.notified().await;
697
698 let (report, _) = wait_for_report(&sender).await;
699 assert_eq!(report.clients["client1"].health, Health::Healthy);
700
701 drop(tx);
702 handle.await.unwrap();
703 }
704
705 #[tokio::test(start_paused = true)]
706 #[timeout(1000)]
707 async fn client_disconnect_sets_offline_immediately() {
708 let (tx, rx) = broadcast::channel(16);
709 let sender = MockSender::default();
710
711 let mut monitor = HealthMonitorBuilder::new()
712 .sender(sender.clone())
713 .event_rx(rx)
714 .client_ids(["client1".to_string()].into_iter().collect())
715 .build()
716 .unwrap();
717 let notify = monitor.test_notify_event_processed.clone();
718
719 let handle = tokio::spawn(async move {
720 monitor.listen().await;
721 });
722 wait_for_report(&sender).await;
723
724 tx.send(Event::ClientConnect {
725 client_id: "client1".to_string(),
726 local_ip: "127.0.0.1".parse().unwrap(),
727 conn_id: 1,
728 })
729 .unwrap();
730 notify.notified().await;
731 wait_for_report(&sender).await;
732
733 tx.send(Event::ClientDisconnect {
734 client_id: "client1".to_string(),
735 conn_id: 1,
736 })
737 .unwrap();
738 notify.notified().await;
739
740 let (report, _) = wait_for_report(&sender).await;
741 assert_eq!(report.clients["client1"].health, Health::Offline);
742
743 drop(tx);
744 handle.await.unwrap();
745 }
746
747 #[tokio::test(start_paused = true)]
748 #[timeout(1000)]
749 async fn client_connection_loss_triggers_unstable_then_offline() {
750 let (tx, rx) = broadcast::channel(16);
751 let sender = MockSender::default();
752
753 let mut monitor = HealthMonitorBuilder::new()
754 .sender(sender.clone())
755 .event_rx(rx)
756 .client_ids(["client1".to_string()].into_iter().collect())
757 .build()
758 .unwrap();
759 let notify = monitor.test_notify_event_processed.clone();
760
761 let handle = tokio::spawn(async move {
762 monitor.listen().await;
763 });
764 wait_for_report(&sender).await;
765
766 tx.send(Event::ClientConnect {
767 client_id: "client1".to_string(),
768 local_ip: "127.0.0.1".parse().unwrap(),
769 conn_id: 1,
770 })
771 .unwrap();
772 notify.notified().await;
773 wait_for_report(&sender).await;
774
775 for i in 1..=3 {
776 tx.send(Event::ClientConnectionLoss {
777 client_id: "client1".to_string(),
778 conn_id: i,
779 })
780 .unwrap();
781 notify.notified().await;
782 }
783
784 advance(Duration::from_secs(30)).await;
785 let (report, _) = wait_for_report(&sender).await;
786 assert_eq!(report.clients["client1"].health, Health::Unstable);
787
788 advance(Duration::from_secs(120)).await;
789 let (report, _) = wait_for_report(&sender).await;
790 assert_eq!(report.clients["client1"].health, Health::Offline);
791
792 drop(tx);
793 handle.await.unwrap();
794 }
795
796 #[tokio::test(start_paused = true)]
797 #[timeout(1000)]
798 async fn client_recovers_from_unstable_after_recovery_grace_period() {
799 let (tx, rx) = broadcast::channel(16);
800 let sender = MockSender::default();
801
802 let mut monitor = HealthMonitorBuilder::new()
803 .sender(sender.clone())
804 .event_rx(rx)
805 .client_ids(["client1".to_string()].into_iter().collect())
806 .connection_loss_limit(1)
807 .build()
808 .unwrap();
809 let notify = monitor.test_notify_event_processed.clone();
810
811 let handle = tokio::spawn(async move {
812 monitor.listen().await;
813 });
814 wait_for_report(&sender).await;
815
816 tx.send(Event::ClientConnect {
817 client_id: "client1".to_string(),
818 local_ip: "127.0.0.1".parse().unwrap(),
819 conn_id: 1,
820 })
821 .unwrap();
822 notify.notified().await;
823 wait_for_report(&sender).await;
824
825 tx.send(Event::ClientConnectionLoss {
826 client_id: "client1".to_string(),
827 conn_id: 1,
828 })
829 .unwrap();
830 notify.notified().await;
831 advance(Duration::from_secs(30)).await;
832 let (report, _) = wait_for_report(&sender).await;
833 assert_eq!(report.clients["client1"].health, Health::Unstable);
834
835 tx.send(Event::ClientConnect {
836 client_id: "client1".to_string(),
837 local_ip: "127.0.0.1".parse().unwrap(),
838 conn_id: 2,
839 })
840 .unwrap();
841 notify.notified().await;
842
843 advance(Duration::from_secs(1199)).await;
844 assert!(sender.last.lock().unwrap().is_none());
845
846 advance(Duration::from_secs(1)).await;
847 let (report, _) = wait_for_report(&sender).await;
848 assert_eq!(report.clients["client1"].health, Health::Healthy);
849
850 drop(tx);
851 handle.await.unwrap();
852 }
853
854 #[tokio::test(start_paused = true)]
855 #[timeout(1000)]
856 async fn sends_reminder_reports_if_any_unhealthy() {
857 let (tx, rx) = broadcast::channel(16);
858 let sender = MockSender::default();
859
860 let mut monitor = HealthMonitorBuilder::new()
861 .sender(sender.clone())
862 .event_rx(rx)
863 .client_ids(["client1".to_string()].into_iter().collect())
864 .connection_loss_limit(1)
865 .build()
866 .unwrap();
867 let notify = monitor.test_notify_event_processed.clone();
868
869 let handle = tokio::spawn(async move {
870 monitor.listen().await;
871 });
872 wait_for_report(&sender).await;
873
874 tx.send(Event::ClientConnect {
875 client_id: "client1".to_string(),
876 local_ip: "127.0.0.1".parse().unwrap(),
877 conn_id: 1,
878 })
879 .unwrap();
880 notify.notified().await;
881 wait_for_report(&sender).await;
882
883 tx.send(Event::ClientConnectionLoss {
884 client_id: "client1".to_string(),
885 conn_id: 1,
886 })
887 .unwrap();
888 notify.notified().await;
889 advance(Duration::from_secs(30)).await;
890 let (report, _) = wait_for_report(&sender).await;
891 assert_eq!(report.clients["client1"].health, Health::Unstable);
892
893 advance(Duration::from_secs(1800)).await;
894 let (report, _) = wait_for_report(&sender).await;
895 assert_ne!(report.clients["client1"].health, Health::Healthy);
896
897 drop(tx);
898 handle.await.unwrap();
899 }
900
901 #[tokio::test(start_paused = true)]
902 #[timeout(1000)]
903 async fn client_goes_offline_after_grace_period() {
904 let (tx, rx) = broadcast::channel(16);
905 let sender = MockSender::default();
906
907 let mut monitor = HealthMonitorBuilder::new()
908 .sender(sender.clone())
909 .event_rx(rx)
910 .client_ids(["client1".to_string()].into_iter().collect())
911 .build()
912 .unwrap();
913 let notify = monitor.test_notify_event_processed.clone();
914
915 let handle = tokio::spawn(async move {
916 monitor.listen().await;
917 });
918 wait_for_report(&sender).await;
919
920 tx.send(Event::ClientConnect {
921 client_id: "client1".to_string(),
922 local_ip: "127.0.0.1".parse().unwrap(),
923 conn_id: 1,
924 })
925 .unwrap();
926 notify.notified().await;
927
928 let report = wait_for_report(&sender).await;
929 assert_eq!(report.0.clients["client1"].health, Health::Healthy);
930
931 tx.send(Event::ClientConnectionLoss {
932 client_id: "client1".to_string(),
933 conn_id: 1,
934 })
935 .unwrap();
936 notify.notified().await;
937
938 advance(Duration::from_secs(120)).await;
939 let report = wait_for_report(&sender).await;
940 assert_eq!(report.0.clients["client1"].health, Health::Offline);
941
942 drop(tx);
943 handle.await.unwrap();
944 }
945}