hyperion_framework/containerisation/
client_broker.rs1use std::collections::HashMap;
25use std::fmt::Debug;
26use std::sync::{Arc as StdArc, atomic::AtomicUsize};
27
28use serde::{Serialize, de::DeserializeOwned};
30use tokio::sync::{Notify, mpsc};
31use tokio::task::JoinSet;
32use tokio::time::{Duration, sleep};
33
34use crate::messages::client_broker_message::ClientBrokerMessage;
36use crate::network::client::Client;
37use crate::network::network_topology::{Connection, NetworkTopology};
38use crate::utilities::tx_sender::add_to_tx_with_retry;
39
40pub struct ClientBroker<T> {
41 messaging_active: bool,
42 client_senders: HashMap<String, mpsc::Sender<T>>, client_threads: JoinSet<()>,
45}
46
47impl<T> ClientBroker<T>
51where
52 T: Debug + Send + 'static + DeserializeOwned + Sync + Clone + Serialize,
53{
54 pub fn init(
55 network_topology: StdArc<NetworkTopology>,
56 container_state: StdArc<AtomicUsize>,
57 container_state_notify: StdArc<Notify>,
58 ) -> Self {
59 let connections: Vec<Connection> = network_topology
62 .client_connections
63 .client_connection_vec
64 .clone();
65
66 let mut client_senders = HashMap::new();
67 let mut client_threads = JoinSet::new();
68
69 for conn in connections {
71 let (client_tx, client_rx) = mpsc::channel::<T>(32);
72 client_senders.insert(conn.name.clone(), client_tx);
73
74 let container_state_clone = container_state.clone();
76 let container_state_notify_clone = container_state_notify.clone();
77 let client = Client::new(
78 conn.name.clone(),
79 conn.address.clone(),
80 client_rx,
81 container_state_clone,
82 container_state_notify_clone,
83 5,
84 );
85
86 client_threads.spawn(async move {
87 if let Err(e) = client.run().await {
88 log::error!("Client {} encountered an error: {e:?}", conn.name);
89 }
90 });
91 }
92
93 Self {
94 messaging_active: true,
95 client_senders,
96 client_threads,
97 }
98 }
99
100 pub async fn handle_message(&self, message: ClientBrokerMessage<T>) {
101 if self.messaging_active {
102 for target_client in &message.target_clients {
103 if let Some(sender) = self.client_senders.get(target_client) {
104 add_to_tx_with_retry(sender, &message.message, "ClientBroker", target_client)
105 .await;
106 } else {
107 log::error!("Unknown TargetClient: {target_client}");
108 }
109 }
110 } else {
111 log::trace!(
112 "Message to {} blocked by inactive ClientBroker",
113 message.target_clients.concat()
114 );
115 }
116 }
117
118 pub async fn forward_shutdown(&mut self, message: T) {
119 for (name, sender) in &self.client_senders {
121 if let Err(e) = sender.send(message.clone()).await {
122 log::error!("Failed to send shutdown message to {name}: {e:?}");
123 }
124 }
125 sleep(Duration::from_millis(500)).await;
126 self.messaging_active = false;
127 }
128
129 pub async fn shutdown(&mut self) {
130 while self.client_threads.join_next().await.is_some() {}
133 self.client_senders.clear(); log::info!("ClientBroker has closed all Clients");
135 }
136}
137
138impl<T> Drop for ClientBroker<T> {
139 fn drop(&mut self) {
140 log::info!("ClientBroker has been closed");
141 }
142}