hyperion_framework/containerisation/
client_broker.rs1use std::collections::HashMap;
25use std::fmt::Debug;
26use std::sync::{Arc as StdArc, atomic::AtomicUsize};
27
28use serde::{Serialize, de::DeserializeOwned};
30use tokio::sync::{Notify, mpsc};
31use tokio::task::JoinSet;
32use tokio::time::{Duration, sleep};
33
34use crate::messages::client_broker_message::ClientBrokerMessage;
36use crate::network::client::Client;
37use crate::network::network_topology::{Connection, NetworkTopology};
38use crate::utilities::tx_sender::add_to_tx_with_retry;
39
40pub struct ClientBroker<T> {
41 messaging_active: bool,
42 client_senders: HashMap<String, mpsc::Sender<T>>,
44 client_threads: JoinSet<()>,
45}
46
47impl<T> ClientBroker<T>
50where
51 T: Debug + Send + 'static + DeserializeOwned + Sync + Clone + Serialize,
52{
53 pub fn init(
62 network_topology: StdArc<NetworkTopology>,
63 container_state: StdArc<AtomicUsize>,
64 container_state_notify: StdArc<Notify>,
65 ) -> Self {
66 let connections: Vec<Connection> = network_topology
69 .client_connections
70 .client_connection_vec
71 .clone();
72
73 let mut client_senders = HashMap::new();
74 let mut client_threads = JoinSet::new();
75
76 for conn in connections {
78 let (client_tx, client_rx) = mpsc::channel::<T>(32);
79 client_senders.insert(conn.name.clone(), client_tx);
80
81 let container_state_clone = container_state.clone();
83 let container_state_notify_clone = container_state_notify.clone();
84 let client = Client::new(
85 conn.name.clone(),
86 conn.address.clone(),
87 client_rx,
88 container_state_clone,
89 container_state_notify_clone,
90 5,
91 );
92
93 client_threads.spawn(async move {
94 if let Err(e) = client.run().await {
95 log::error!("Client {} encountered an error: {e:?}", conn.name);
96 }
97 });
98 }
99
100 Self {
101 messaging_active: true,
102 client_senders,
103 client_threads,
104 }
105 }
106
107 pub async fn handle_message(&self, message: ClientBrokerMessage<T>) {
108 if self.messaging_active {
109 for target_client in &message.target_clients {
110 if let Some(sender) = self.client_senders.get(target_client) {
111 add_to_tx_with_retry(sender, &message.message, "ClientBroker", target_client)
112 .await;
113 } else {
114 log::error!("Unknown TargetClient: {target_client}");
115 }
116 }
117 } else {
118 log::trace!(
119 "Message to {} blocked by inactive ClientBroker",
120 message.target_clients.concat()
121 );
122 }
123 }
124
125 pub async fn forward_shutdown(&mut self, message: T) {
126 for (name, sender) in &self.client_senders {
128 if let Err(e) = sender.send(message.clone()).await {
129 log::error!("Failed to send shutdown message to {name}: {e:?}");
130 }
131 }
132 sleep(Duration::from_millis(500)).await;
133 self.messaging_active = false;
134 }
135
136 pub async fn shutdown(&mut self) {
137 while self.client_threads.join_next().await.is_some() {}
140 self.client_senders.clear(); log::info!("ClientBroker has closed all Clients");
142 }
143}
144
145impl<T> Drop for ClientBroker<T> {
146 fn drop(&mut self) {
147 log::info!("ClientBroker has been closed");
148 }
149}