1use crate::configuration::ClientConfiguration;
2use crate::interfaces::{setup_client_interfaces, NANOSECOND, SECOND};
3use crate::notifications::{ClientReadyMessage, Key, Notification};
4use crate::shutdown::listen_for_shutdown;
5use crate::{Error, CHANNEL_BUFFER};
6use tracing::{debug, error, info, trace, warn};
7use std::sync::{Arc, Mutex};
8use tokio::sync::watch::Receiver;
9use tokio::sync::{broadcast, mpsc, watch};
10
11const DEFAULT_WAIT_FOR_SHUTDOWN_SECS: u64 = 2;
12
13pub async fn start_client(
19 client_config: ClientConfiguration,
20 notification_rx: mpsc::Receiver<ClientReadyMessage>,
21 shutdown: Option<Receiver<bool>>,
22 wait_for_shutdown_secs: Option<u64>,
23) -> Result<(), Error> {
24 let (shutdown_tx, shutdown_rx) = watch::channel(false);
25 let (interface_tx, interface_rx) = broadcast::channel(CHANNEL_BUFFER);
26 let key = client_config.key().clone();
27
28 let interfaces = client_config.interfaces();
30 setup_client_interfaces(interfaces, interface_rx, shutdown_rx.clone()).await?;
31
32 tokio::spawn(async move {
34 receive_notifications(notification_rx, interface_tx, shutdown_rx.clone(), key).await;
35 });
36
37 listen_for_shutdown(shutdown_tx, shutdown, wait_for_shutdown_secs.unwrap_or(DEFAULT_WAIT_FOR_SHUTDOWN_SECS)).await;
39
40 Ok(())
41}
42
43pub async fn start_client_arc(
49 client_config: ClientConfiguration,
50 notifications: Arc<Mutex<Vec<ClientReadyMessage>>>,
51 shutdown: Option<Receiver<bool>>,
52 wait_for_shutdown_secs: Option<u64>,
53) -> Result<(), Error> {
54 let (shutdown_tx, shutdown_rx) = watch::channel(false);
55 let (interface_tx, interface_rx) = broadcast::channel(CHANNEL_BUFFER);
56 let key = client_config.key().clone();
57
58 let interfaces = client_config.interfaces();
60 setup_client_interfaces(interfaces, interface_rx, shutdown_rx.clone()).await?;
61
62 tokio::spawn(async move {
64 receive_notifications_arc(notifications, interface_tx, shutdown_rx.clone(), key).await;
65 });
66
67 listen_for_shutdown(shutdown_tx, shutdown, wait_for_shutdown_secs.unwrap_or(DEFAULT_WAIT_FOR_SHUTDOWN_SECS)).await;
69
70 Ok(())
71}
72
73async fn receive_notifications(
74 mut notification_rx: mpsc::Receiver<ClientReadyMessage>,
75 interface_tx: broadcast::Sender<Notification>,
76 shutdown: Receiver<bool>,
77 key: Key,
78) {
79 info!("Client waiting for notifications");
80
81 let mut shutdown_rx = shutdown.clone();
82 loop {
83 tokio::select! {
84 msg = notification_rx.recv() => {
85 if let Some(client_ready_msg) = msg {
86 let notification = client_ready_msg.to_notification(&key);
87 debug!("Client Sending Notification: {:?}", notification);
88 match interface_tx.send(notification) {
89 Ok(ok) => debug!("Message passed to client {} interfaces", ok),
90 Err(error) => {
91 error!("Client broadcast channel send error: {}", error);
92 break;
93 },
94 }
95 }
96 }
97
98 _ = shutdown_rx.changed() => {
99 trace!("Shutdown receive_notifications");
100 break;
101 }
102
103 _ = tokio::time::sleep(SECOND) => {
104 trace!("Sleep timeout reached for receive_notifications");
105 }
106 }
107 tokio::time::sleep(NANOSECOND).await;
108 }
109}
110
111async fn receive_notifications_arc(
112 notifications: Arc<Mutex<Vec<ClientReadyMessage>>>,
113 interface_tx: broadcast::Sender<Notification>,
114 shutdown: Receiver<bool>,
115 key: Key,
116) {
117 info!("Client waiting for notifications");
118
119 let mut shutdown_rx = shutdown.clone();
120 loop {
121 tokio::select! {
122 _ = shutdown_rx.changed() => {
123 trace!("Shutdown receive_notifications_arc");
124 break;
125 }
126
127 _ = tokio::time::sleep(SECOND) => {
128 trace!("Sleep timeout reached for receive_notifications_arc");
129 }
130 }
131
132 let messages: Vec<ClientReadyMessage> = notifications.lock().unwrap().drain(0..).collect();
133
134 if !messages.is_empty() {
135 for client_ready_msg in messages {
136 let notification = client_ready_msg.to_notification(&key);
137 debug!("Client attempting to send Notification: {:?}", notification);
138
139 match interface_tx.send(notification) {
140 Ok(ok) => debug!("Message passed to client interfaces: {}", ok),
141 Err(error) => warn!("Client broadcast channel send error: {}", error),
142 }
143 }
144 }
145 tokio::time::sleep(NANOSECOND).await;
146 }
147}