use crate::configuration::ClientConfiguration;
use crate::interfaces::{setup_client_interfaces, NANOSECOND, SECOND};
use crate::notifications::{ClientReadyMessage, Key, Notification};
use crate::shutdown::listen_for_shutdown;
use crate::{Error, CHANNEL_BUFFER};
use tracing::{debug, error, info, trace, warn};
use std::sync::{Arc, Mutex};
use tokio::sync::watch::Receiver;
use tokio::sync::{broadcast, mpsc, watch};
const DEFAULT_WAIT_FOR_SHUTDOWN_SECS: u64 = 2;
pub async fn start_client(
client_config: ClientConfiguration,
notification_rx: mpsc::Receiver<ClientReadyMessage>,
shutdown: Option<Receiver<bool>>,
wait_for_shutdown_secs: Option<u64>,
) -> Result<(), Error> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (interface_tx, interface_rx) = broadcast::channel(CHANNEL_BUFFER);
let key = client_config.key().clone();
let interfaces = client_config.interfaces();
setup_client_interfaces(interfaces, interface_rx, shutdown_rx.clone()).await?;
tokio::spawn(async move {
receive_notifications(notification_rx, interface_tx, shutdown_rx.clone(), key).await;
});
listen_for_shutdown(shutdown_tx, shutdown, wait_for_shutdown_secs.unwrap_or(DEFAULT_WAIT_FOR_SHUTDOWN_SECS)).await;
Ok(())
}
pub async fn start_client_arc(
client_config: ClientConfiguration,
notifications: Arc<Mutex<Vec<ClientReadyMessage>>>,
shutdown: Option<Receiver<bool>>,
wait_for_shutdown_secs: Option<u64>,
) -> Result<(), Error> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (interface_tx, interface_rx) = broadcast::channel(CHANNEL_BUFFER);
let key = client_config.key().clone();
let interfaces = client_config.interfaces();
setup_client_interfaces(interfaces, interface_rx, shutdown_rx.clone()).await?;
tokio::spawn(async move {
receive_notifications_arc(notifications, interface_tx, shutdown_rx.clone(), key).await;
});
listen_for_shutdown(shutdown_tx, shutdown, wait_for_shutdown_secs.unwrap_or(DEFAULT_WAIT_FOR_SHUTDOWN_SECS)).await;
Ok(())
}
async fn receive_notifications(
mut notification_rx: mpsc::Receiver<ClientReadyMessage>,
interface_tx: broadcast::Sender<Notification>,
shutdown: Receiver<bool>,
key: Key,
) {
info!("Client waiting for notifications");
let mut shutdown_rx = shutdown.clone();
loop {
tokio::select! {
msg = notification_rx.recv() => {
if let Some(client_ready_msg) = msg {
let notification = client_ready_msg.to_notification(&key);
debug!("Client Sending Notification: {:?}", notification);
match interface_tx.send(notification) {
Ok(ok) => debug!("Message passed to client {} interfaces", ok),
Err(error) => {
error!("Client broadcast channel send error: {}", error);
break;
},
}
}
}
_ = shutdown_rx.changed() => {
trace!("Shutdown receive_notifications");
break;
}
_ = tokio::time::sleep(SECOND) => {
trace!("Sleep timeout reached for receive_notifications");
}
}
tokio::time::sleep(NANOSECOND).await;
}
}
async fn receive_notifications_arc(
notifications: Arc<Mutex<Vec<ClientReadyMessage>>>,
interface_tx: broadcast::Sender<Notification>,
shutdown: Receiver<bool>,
key: Key,
) {
info!("Client waiting for notifications");
let mut shutdown_rx = shutdown.clone();
loop {
tokio::select! {
_ = shutdown_rx.changed() => {
trace!("Shutdown receive_notifications_arc");
break;
}
_ = tokio::time::sleep(SECOND) => {
trace!("Sleep timeout reached for receive_notifications_arc");
}
}
let messages: Vec<ClientReadyMessage> = notifications.lock().unwrap().drain(0..).collect();
if !messages.is_empty() {
for client_ready_msg in messages {
let notification = client_ready_msg.to_notification(&key);
debug!("Client attempting to send Notification: {:?}", notification);
match interface_tx.send(notification) {
Ok(ok) => debug!("Message passed to client interfaces: {}", ok),
Err(error) => warn!("Client broadcast channel send error: {}", error),
}
}
}
tokio::time::sleep(NANOSECOND).await;
}
}