pass_it_on/
client.rs

1use crate::configuration::ClientConfiguration;
2use crate::interfaces::{setup_client_interfaces, NANOSECOND, SECOND};
3use crate::notifications::{ClientReadyMessage, Key, Notification};
4use crate::shutdown::listen_for_shutdown;
5use crate::{Error, CHANNEL_BUFFER};
6use tracing::{debug, error, info, trace, warn};
7use std::sync::{Arc, Mutex};
8use tokio::sync::watch::Receiver;
9use tokio::sync::{broadcast, mpsc, watch};
10
11const DEFAULT_WAIT_FOR_SHUTDOWN_SECS: u64 = 2;
12
13/// Start the client with provided [`ClientConfiguration`] and `Receiver<ClientReadyMessage>` channel.
14///
15/// Client listens for shutdown signals SIGTERM & SIGINT on Unix or CTRL-BREAK and CTRL-C on Windows.
16/// Also accepts a `Option<tokio::sync::watch::Receiver<bool>>` to shut down the client in addition to
17/// system signals.
18pub async fn start_client(
19    client_config: ClientConfiguration,
20    notification_rx: mpsc::Receiver<ClientReadyMessage>,
21    shutdown: Option<Receiver<bool>>,
22    wait_for_shutdown_secs: Option<u64>,
23) -> Result<(), Error> {
24    let (shutdown_tx, shutdown_rx) = watch::channel(false);
25    let (interface_tx, interface_rx) = broadcast::channel(CHANNEL_BUFFER);
26    let key = client_config.key().clone();
27
28    // Setup interfaces to send notifications to
29    let interfaces = client_config.interfaces();
30    setup_client_interfaces(interfaces, interface_rx, shutdown_rx.clone()).await?;
31
32    // Monitor for incoming notifications
33    tokio::spawn(async move {
34        receive_notifications(notification_rx, interface_tx, shutdown_rx.clone(), key).await;
35    });
36
37    // Shutdown
38    listen_for_shutdown(shutdown_tx, shutdown, wait_for_shutdown_secs.unwrap_or(DEFAULT_WAIT_FOR_SHUTDOWN_SECS)).await;
39
40    Ok(())
41}
42
43/// Start the client with provided [`ClientConfiguration`] and `Arc<Mutex<Vec<ClientReadyMessage>>>`.
44///
45/// Client listens for shutdown signals SIGTERM & SIGINT  on Unix or CTRL-BREAK and CTRL-C on Windows.
46/// Also accepts a `Option<tokio::sync::watch::Receiver<bool>>` to shutdown the client in addition to
47/// system signals.
48pub async fn start_client_arc(
49    client_config: ClientConfiguration,
50    notifications: Arc<Mutex<Vec<ClientReadyMessage>>>,
51    shutdown: Option<Receiver<bool>>,
52    wait_for_shutdown_secs: Option<u64>,
53) -> Result<(), Error> {
54    let (shutdown_tx, shutdown_rx) = watch::channel(false);
55    let (interface_tx, interface_rx) = broadcast::channel(CHANNEL_BUFFER);
56    let key = client_config.key().clone();
57
58    // Setup interfaces to send notifications to
59    let interfaces = client_config.interfaces();
60    setup_client_interfaces(interfaces, interface_rx, shutdown_rx.clone()).await?;
61
62    // Monitor for incoming notifications
63    tokio::spawn(async move {
64        receive_notifications_arc(notifications, interface_tx, shutdown_rx.clone(), key).await;
65    });
66
67    // Shutdown
68    listen_for_shutdown(shutdown_tx, shutdown, wait_for_shutdown_secs.unwrap_or(DEFAULT_WAIT_FOR_SHUTDOWN_SECS)).await;
69
70    Ok(())
71}
72
73async fn receive_notifications(
74    mut notification_rx: mpsc::Receiver<ClientReadyMessage>,
75    interface_tx: broadcast::Sender<Notification>,
76    shutdown: Receiver<bool>,
77    key: Key,
78) {
79    info!("Client waiting for notifications");
80
81    let mut shutdown_rx = shutdown.clone();
82    loop {
83        tokio::select! {
84            msg = notification_rx.recv() => {
85                if let Some(client_ready_msg) = msg {
86                    let notification = client_ready_msg.to_notification(&key);
87                    debug!("Client Sending Notification: {:?}", notification);
88                    match interface_tx.send(notification) {
89                        Ok(ok) => debug!("Message passed to client {} interfaces", ok),
90                        Err(error) => {
91                            error!("Client broadcast channel send error: {}", error);
92                            break;
93                        },
94                    }
95                }
96            }
97
98            _ = shutdown_rx.changed() => {
99                trace!("Shutdown receive_notifications");
100                 break;
101                }
102
103            _ = tokio::time::sleep(SECOND) => {
104                trace!("Sleep timeout reached for receive_notifications");
105            }
106        }
107        tokio::time::sleep(NANOSECOND).await;
108    }
109}
110
111async fn receive_notifications_arc(
112    notifications: Arc<Mutex<Vec<ClientReadyMessage>>>,
113    interface_tx: broadcast::Sender<Notification>,
114    shutdown: Receiver<bool>,
115    key: Key,
116) {
117    info!("Client waiting for notifications");
118
119    let mut shutdown_rx = shutdown.clone();
120    loop {
121        tokio::select! {
122            _ = shutdown_rx.changed() => {
123                trace!("Shutdown receive_notifications_arc");
124                 break;
125                }
126
127            _ = tokio::time::sleep(SECOND) => {
128                trace!("Sleep timeout reached for receive_notifications_arc");
129            }
130        }
131
132        let messages: Vec<ClientReadyMessage> = notifications.lock().unwrap().drain(0..).collect();
133
134        if !messages.is_empty() {
135            for client_ready_msg in messages {
136                let notification = client_ready_msg.to_notification(&key);
137                debug!("Client attempting to send Notification: {:?}", notification);
138
139                match interface_tx.send(notification) {
140                    Ok(ok) => debug!("Message passed to client interfaces: {}", ok),
141                    Err(error) => warn!("Client broadcast channel send error: {}", error),
142                }
143            }
144        }
145        tokio::time::sleep(NANOSECOND).await;
146    }
147}