hyperion_framework/containerisation/
client_broker.rs

1// -------------------------------------------------------------------------------------------------
2// Hyperion Framework
3// https://github.com/Bazzz-1/hyperion-framework
4//
5// A lightweight component-based TCP framework for building service-oriented Rust applications with
6// CLI control, async messaging, and lifecycle management.
7//
8// Copyright 2025 Robert Hannah
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License at
13//
14//     http://www.apache.org/licenses/LICENSE-2.0
15//
16// Unless required by applicable law or agreed to in writing, software
17// distributed under the License is distributed on an "AS IS" BASIS,
18// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19// See the License for the specific language governing permissions and
20// limitations under the License.
21// -------------------------------------------------------------------------------------------------
22
23// Standard
24use std::collections::HashMap;
25use std::fmt::Debug;
26use std::sync::{Arc as StdArc, atomic::AtomicUsize};
27
28// Package
29use serde::{Serialize, de::DeserializeOwned};
30use tokio::sync::{Notify, mpsc};
31use tokio::task::JoinSet;
32use tokio::time::{Duration, sleep};
33
34// Local
35use crate::messages::client_broker_message::ClientBrokerMessage;
36use crate::network::client::Client;
37use crate::network::network_topology::{Connection, NetworkTopology};
38use crate::utilities::tx_sender::add_to_tx_with_retry;
39
40pub struct ClientBroker<T> {
41    messaging_active: bool,
42    // network_topology:           StdArc<NetworkTopology>,        // TODO: Use this to restart connections?
43    client_senders: HashMap<String, mpsc::Sender<T>>, // TODO: Make this not a hash map
44    client_threads: JoinSet<()>,
45}
46
47// TODO: Check client state for restart when needed
48// TODO: Actual broker part
49
50impl<T> ClientBroker<T>
51where
52    T: Debug + Send + 'static + DeserializeOwned + Sync + Clone + Serialize,
53{
54    pub fn init(
55        network_topology: StdArc<NetworkTopology>,
56        container_state: StdArc<AtomicUsize>,
57        container_state_notify: StdArc<Notify>,
58    ) -> Self {
59        // Gather connections
60        // This must be cloned as topolgy currently doesn't live that long
61        let connections: Vec<Connection> = network_topology
62            .client_connections
63            .client_connection_vec
64            .clone();
65
66        let mut client_senders = HashMap::new();
67        let mut client_threads = JoinSet::new();
68
69        // Create clients from connections
70        for conn in connections {
71            let (client_tx, client_rx) = mpsc::channel::<T>(32);
72            client_senders.insert(conn.name.clone(), client_tx);
73
74            // Create and spawn the client on a new thread
75            let container_state_clone = container_state.clone();
76            let container_state_notify_clone = container_state_notify.clone();
77            let client = Client::new(
78                conn.name.clone(),
79                conn.address.clone(),
80                client_rx,
81                container_state_clone,
82                container_state_notify_clone,
83                5,
84            );
85
86            client_threads.spawn(async move {
87                if let Err(e) = client.run().await {
88                    log::error!("Client {} encountered an error: {e:?}", conn.name);
89                }
90            });
91        }
92
93        Self {
94            messaging_active: true,
95            client_senders,
96            client_threads,
97        }
98    }
99
100    pub async fn handle_message(&self, message: ClientBrokerMessage<T>) {
101        if self.messaging_active {
102            for target_client in &message.target_clients {
103                if let Some(sender) = self.client_senders.get(target_client) {
104                    add_to_tx_with_retry(sender, &message.message, "ClientBroker", target_client)
105                        .await;
106                } else {
107                    log::error!("Unknown TargetClient: {target_client}");
108                }
109            }
110        } else {
111            log::trace!(
112                "Message to {} blocked by inactive ClientBroker",
113                message.target_clients.concat()
114            );
115        }
116    }
117
118    pub async fn forward_shutdown(&mut self, message: T) {
119        // Send shutdown command across all clients
120        for (name, sender) in &self.client_senders {
121            if let Err(e) = sender.send(message.clone()).await {
122                log::error!("Failed to send shutdown message to {name}: {e:?}");
123            }
124        }
125        sleep(Duration::from_millis(500)).await;
126        self.messaging_active = false;
127    }
128
129    pub async fn shutdown(&mut self) {
130        // Wait for clients to finish
131        // ContainerState must be set to shutdown before this is called
132        while self.client_threads.join_next().await.is_some() {}
133        self.client_senders.clear(); // Drop all senders explicitly
134        log::info!("ClientBroker has closed all Clients");
135    }
136}
137
138impl<T> Drop for ClientBroker<T> {
139    fn drop(&mut self) {
140        log::info!("ClientBroker has been closed");
141    }
142}