hyperion_framework/network/
client.rs

1// -------------------------------------------------------------------------------------------------
2// Hyperion Framework
3// https://github.com/Bazzz-1/hyperion-framework
4//
5// A lightweight component-based TCP framework for building service-oriented Rust applications with
6// CLI control, async messaging, and lifecycle management.
7//
8// Copyright 2025 Robert Hannah
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License at
13//
14//     http://www.apache.org/licenses/LICENSE-2.0
15//
16// Unless required by applicable law or agreed to in writing, software
17// distributed under the License is distributed on an "AS IS" BASIS,
18// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19// See the License for the specific language governing permissions and
20// limitations under the License.
21// -------------------------------------------------------------------------------------------------
22
23// Standard
24use std::fmt::Debug;
25use std::sync::{
26    Arc as StdArc,
27    atomic::{AtomicUsize, Ordering},
28};
29
30// Package
31use serde::Serialize;
32use tokio::io::AsyncWriteExt;
33use tokio::net::TcpStream;
34use tokio::sync::{Notify, mpsc};
35use tokio::time::{Duration, sleep};
36
37// Local
38use crate::containerisation::container_state::ContainerState;
39use crate::network::serialiser;
40
41pub struct Client<T> {
42    pub connection_name: String,
43    server_address: String,
44    client_rx: mpsc::Receiver<T>,
45    container_state: StdArc<AtomicUsize>,
46    container_state_notify: StdArc<Notify>,
47    send_retries: u8,
48    max_send_retries: u8,
49    internal_client_state: bool,
50}
51
52// ========================================================================
53//    A Client will ONLY send messages
54// ========================================================================
55
56impl<T> Client<T>
57where
58    T: Clone + Serialize + Debug,
59{
60    // ========================================================================
61    //    Create new Client
62    // ========================================================================
63    pub fn new(
64        connection_name: String,
65        server_address: String,
66        client_rx: mpsc::Receiver<T>,
67        container_state: StdArc<AtomicUsize>,
68        container_state_notify: StdArc<Notify>,
69        max_send_retries: u8,
70    ) -> Self {
71        Self {
72            connection_name,
73            server_address,
74            client_rx,
75            container_state,
76            container_state_notify,
77            send_retries: 0,
78            max_send_retries,
79            internal_client_state: true, // Default to true, Client is operational
80        }
81    }
82
83    // ========================================================================
84    //    Run async Client
85    // ========================================================================
86    pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error>> {
87        self.internal_client_state = true;
88        self.send_retries = 0;
89
90        loop {
91            // This check must be here or the client will just keep restarting if message send failure
92            if self.is_closing_connection() {
93                break;
94            }
95
96            match TcpStream::connect(self.server_address.to_string()).await {
97                Ok(mut stream) => {
98                    log::info!(
99                        "{} Client connected on {}",
100                        self.connection_name,
101                        self.server_address
102                    );
103                    self.send_retries = 0; // Reset retry counter after successful connection
104
105                    loop {
106                        tokio::select! {
107                            // Listen for incoming messages from tokio mpsc channel
108                            // Using select and tokio as messages can be sent instantly
109                            Some(message) = self.client_rx.recv() => {
110                                let payload = match serialiser::serialise_message(&message) {
111                                    Ok(payload) => payload,
112                                    Err(e) => {
113                                        log::error!("Failed to serialise message: {e:?} \n{message:?}");
114                                        continue; // Skip this message and try the next one
115                                    }
116                                };
117
118                                // Length-prefix the payload
119                                let len = (payload.len() as u32).to_be_bytes(); // 4-byte big-endian length
120                                let mut framed_msg = Vec::with_capacity(4 + payload.len());
121                                framed_msg.extend_from_slice(&len);
122                                framed_msg.extend_from_slice(&payload);
123
124                                // Send message
125                                match stream.write_all(&framed_msg).await {
126                                    Ok(_) => {
127                                        // Optional debug
128                                        // log::debug!("Message sent: {message:?}");
129                                    }
130                                    Err(e) => {
131                                        log::warn!("Failed to send message to {}: {e:?}", self.connection_name);
132                                        self.send_retries += 1;
133
134                                        if self.send_retries >= self.max_send_retries {
135                                            log::warn!("{} Client: {} failed to send message after {} retries. Closing client...",
136                                                self.connection_name, self.server_address, self.max_send_retries);
137                                            self.internal_client_state = false;
138
139                                            // Close stream
140                                            if let Err(e) = stream.shutdown().await {
141                                                log::warn!("Failed to shutdown stream: {e:?}");
142                                            }
143                                            break;
144                                        }
145
146                                        // Halved exponential backoff before retrying connection
147                                        sleep(Duration::from_secs((2u64.pow(self.send_retries as u32)) / 2)).await;
148                                    }
149                                }
150                            },
151
152                            // Listen for state changes
153                            _ = self.container_state_notify.notified() => {
154                                // Check if state has been set to ShuttingDown
155                                if self.is_closing_connection() {
156                                    // Close the stream
157                                    if let Err(e) = stream.shutdown().await {
158                                        log::warn!("Failed to shutdown stream: {e:?}");
159                                    }
160                                    break; // Break out of receiving loop
161                                }
162                            }
163                        }
164                    }
165                }
166                Err(e) => {
167                    // Connection to Server failed - retry
168                    log::warn!(
169                        "{} Client: {} failed to connect: {e:?}",
170                        self.connection_name,
171                        self.server_address
172                    );
173                    self.send_retries += 1;
174
175                    if self.send_retries >= self.max_send_retries {
176                        log::warn!(
177                            "Client {} failed to connect to Server after {} retries",
178                            self.server_address,
179                            self.max_send_retries
180                        );
181                        self.internal_client_state = false;
182                        break;
183                    }
184
185                    // Halved exponential backoff before retrying connection
186                    sleep(Duration::from_secs(
187                        (2u64.pow(self.send_retries as u32)) / 2,
188                    ))
189                    .await;
190                }
191            }
192        }
193
194        log::info!(
195            "{} Client: {} closed gracefully",
196            self.connection_name,
197            self.server_address
198        );
199        Ok(())
200    }
201
202    fn is_closing_connection(&self) -> bool {
203        if ContainerState::from(self.container_state.load(Ordering::SeqCst))
204            == ContainerState::ShuttingDown
205        {
206            log::info!(
207                "{} Client ({}) has received closing instruction via ContainerState...",
208                self.connection_name,
209                self.server_address
210            );
211            true
212        } else if !self.internal_client_state {
213            log::info!(
214                "{} Client ({}) is closing due to connection issues...",
215                self.connection_name,
216                self.server_address
217            );
218            true
219        } else {
220            false
221        }
222    }
223}