Skip to main content

hyperion_framework/containerisation/
client_broker.rs

1// -------------------------------------------------------------------------------------------------
2// Hyperion Framework
3// https://github.com/robert-hannah/hyperion-framework
4//
5// A lightweight component-based TCP framework for building service-oriented Rust applications with
6// CLI control, async messaging, and lifecycle management.
7//
8// Copyright 2025 Robert Hannah
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License at
13//
14//     http://www.apache.org/licenses/LICENSE-2.0
15//
16// Unless required by applicable law or agreed to in writing, software
17// distributed under the License is distributed on an "AS IS" BASIS,
18// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19// See the License for the specific language governing permissions and
20// limitations under the License.
21// -------------------------------------------------------------------------------------------------
22
23// Standard
24use std::collections::HashMap;
25use std::fmt::Debug;
26use std::sync::{Arc as StdArc, atomic::AtomicUsize};
27
28// Package
29use serde::{Serialize, de::DeserializeOwned};
30use tokio::sync::{Notify, mpsc};
31use tokio::task::JoinSet;
32use tokio::time::{Duration, sleep};
33
34// Local
35use crate::messages::client_broker_message::ClientBrokerMessage;
36use crate::network::client::Client;
37use crate::network::network_topology::{Connection, NetworkTopology};
38use crate::utilities::tx_sender::add_to_tx_with_retry;
39
40pub struct ClientBroker<T> {
41    messaging_active: bool,
42    // network_topology:           StdArc<NetworkTopology>,        // TODO: Use this to restart connections
43    client_senders: HashMap<String, mpsc::Sender<T>>,
44    client_threads: JoinSet<()>,
45}
46
47// TODO: Check client state for restart when needed
48
49impl<T> ClientBroker<T>
50where
51    T: Debug + Send + 'static + DeserializeOwned + Sync + Clone + Serialize,
52{
53    /// Initializes the `ClientBroker` with the network topology, container state,
54    /// and state notification mechanism.
55    ///
56    /// # Arguments
57    ///
58    /// * `network_topology` - A reference-counted arc for network connections.
59    /// * `container_state` - Atomic state of the container lifecycle.
60    /// * `container_state_notify` - State notification for component updates.
61    pub fn init(
62        network_topology: StdArc<NetworkTopology>,
63        container_state: StdArc<AtomicUsize>,
64        container_state_notify: StdArc<Notify>,
65    ) -> Self {
66        // Gather connections
67        // Clone topology to prolong life
68        let connections: Vec<Connection> = network_topology
69            .client_connections
70            .client_connection_vec
71            .clone();
72
73        let mut client_senders = HashMap::new();
74        let mut client_threads = JoinSet::new();
75
76        // Create clients from connections
77        for conn in connections {
78            let (client_tx, client_rx) = mpsc::channel::<T>(32);
79            client_senders.insert(conn.name.clone(), client_tx);
80
81            // Create and spawn the client on a new thread
82            let container_state_clone = container_state.clone();
83            let container_state_notify_clone = container_state_notify.clone();
84            let client = Client::new(
85                conn.name.clone(),
86                conn.address.clone(),
87                client_rx,
88                container_state_clone,
89                container_state_notify_clone,
90                5,
91            );
92
93            client_threads.spawn(async move {
94                if let Err(e) = client.run().await {
95                    log::error!("Client {} encountered an error: {e:?}", conn.name);
96                }
97            });
98        }
99
100        Self {
101            messaging_active: true,
102            client_senders,
103            client_threads,
104        }
105    }
106
107    pub async fn handle_message(&self, message: ClientBrokerMessage<T>) {
108        if self.messaging_active {
109            for target_client in &message.target_clients {
110                if let Some(sender) = self.client_senders.get(target_client) {
111                    add_to_tx_with_retry(sender, &message.message, "ClientBroker", target_client)
112                        .await;
113                } else {
114                    log::error!("Unknown TargetClient: {target_client}");
115                }
116            }
117        } else {
118            log::trace!(
119                "Message to {} blocked by inactive ClientBroker",
120                message.target_clients.concat()
121            );
122        }
123    }
124
125    pub async fn forward_shutdown(&mut self, message: T) {
126        // Send shutdown command across all clients
127        for (name, sender) in &self.client_senders {
128            if let Err(e) = sender.send(message.clone()).await {
129                log::error!("Failed to send shutdown message to {name}: {e:?}");
130            }
131        }
132        sleep(Duration::from_millis(500)).await;
133        self.messaging_active = false;
134    }
135
136    pub async fn shutdown(&mut self) {
137        // Wait for clients to finish
138        // ContainerState must be set to shutdown before this is called
139        while self.client_threads.join_next().await.is_some() {}
140        self.client_senders.clear(); // Drop all senders explicitly
141        log::info!("ClientBroker has closed all Clients");
142    }
143}
144
145impl<T> Drop for ClientBroker<T> {
146    fn drop(&mut self) {
147        log::info!("ClientBroker has been closed");
148    }
149}