Skip to main content

hyperion_framework/network/
client.rs

1// -------------------------------------------------------------------------------------------------
2// Hyperion Framework
3// https://github.com/robert-hannah/hyperion-framework
4//
5// A lightweight component-based TCP framework for building service-oriented Rust applications with
6// CLI control, async messaging, and lifecycle management.
7//
8// Copyright 2025 Robert Hannah
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License at
13//
14//     http://www.apache.org/licenses/LICENSE-2.0
15//
16// Unless required by applicable law or agreed to in writing, software
17// distributed under the License is distributed on an "AS IS" BASIS,
18// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19// See the License for the specific language governing permissions and
20// limitations under the License.
21// -------------------------------------------------------------------------------------------------
22
23// Standard
24use std::fmt::Debug;
25use std::sync::{
26    Arc as StdArc,
27    atomic::{AtomicUsize, Ordering},
28};
29
30// Package
31use serde::Serialize;
32use tokio::io::AsyncWriteExt;
33use tokio::net::TcpStream;
34use tokio::sync::{Notify, mpsc};
35use tokio::time::{Duration, sleep};
36
37// Local
38use crate::containerisation::container_state::ContainerState;
39use crate::network::serialiser;
40
41pub struct Client<T> {
42    pub connection_name: String,
43    server_address: String,
44    client_rx: mpsc::Receiver<T>,
45    container_state: StdArc<AtomicUsize>,
46    container_state_notify: StdArc<Notify>,
47    send_retries: u8,
48    max_send_retries: u8,
49    internal_client_state: bool,
50}
51
52// ========================================================================
53//    A Client will ONLY send messages
54// ========================================================================
55
56impl<T> Client<T>
57where
58    T: Clone + Serialize + Debug,
59{
60    // ========================================================================
61    //    Create new Client
62    // ========================================================================
63    pub fn new(
64        connection_name: String,
65        server_address: String,
66        client_rx: mpsc::Receiver<T>,
67        container_state: StdArc<AtomicUsize>,
68        container_state_notify: StdArc<Notify>,
69        max_send_retries: u8,
70    ) -> Self {
71        Self {
72            connection_name,
73            server_address,
74            client_rx,
75            container_state,
76            container_state_notify,
77            send_retries: 0,
78            max_send_retries,
79            internal_client_state: true, // Default to true, Client is operational
80        }
81    }
82
83    // ========================================================================
84    //    Run async Client
85    // ========================================================================
86    pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error>> {
87        self.internal_client_state = true;
88        self.send_retries = 0;
89
90        loop {
91            // This check must be here or the client will just keep restarting if message send failure
92            if self.is_closing_connection() {
93                break;
94            }
95
96            match TcpStream::connect(self.server_address.to_string()).await {
97                Ok(mut stream) => {
98                    log::info!(
99                        "{} Client connected on {}",
100                        self.connection_name,
101                        self.server_address
102                    );
103                    self.send_retries = 0; // Reset retry counter after successful connection
104
105                    loop {
106                        tokio::select! {
107                            // Listen for incoming messages from tokio mpsc channel
108                            // Using select and tokio as messages can be sent instantly
109                            Some(message) = self.client_rx.recv() => {
110                                let payload = match serialiser::serialise_message(&message) {
111                                    Ok(payload) => payload,
112                                    Err(e) => {
113                                        log::error!("Failed to serialise message: {e:?} \n{message:?}");
114                                        continue; // Skip this message and try the next one
115                                    }
116                                };
117
118                                // Length-prefix the payload
119                                let len = (payload.len() as u32).to_be_bytes(); // 4-byte big-endian length
120                                let mut framed_msg = Vec::with_capacity(4 + payload.len());
121                                framed_msg.extend_from_slice(&len);
122                                framed_msg.extend_from_slice(&payload);
123
124                                // Send message
125                                match stream.write_all(&framed_msg).await {
126                                    Ok(_) => {
127                                        log::trace!("Message sent: {message:?}");
128                                    }
129                                    Err(e) => {
130                                        log::warn!("Failed to send message to {}: {e:?}", self.connection_name);
131                                        self.send_retries += 1;
132
133                                        if self.send_retries >= self.max_send_retries {
134                                            log::warn!("{} Client: {} failed to send message after {} retries. Closing client...",
135                                                self.connection_name, self.server_address, self.max_send_retries);
136                                            self.internal_client_state = false;
137
138                                            // Close stream
139                                            if let Err(e) = stream.shutdown().await {
140                                                log::warn!("Failed to shutdown stream: {e:?}");
141                                            }
142                                            break;
143                                        }
144
145                                        // Halved exponential backoff before retrying connection
146                                        sleep(Duration::from_secs((2u64.pow(self.send_retries as u32)) / 2)).await;
147                                    }
148                                }
149                            },
150
151                            // Listen for state changes
152                            _ = self.container_state_notify.notified() => {
153                                // Check if state has been set to ShuttingDown
154                                if self.is_closing_connection() {
155                                    // Close the stream
156                                    if let Err(e) = stream.shutdown().await {
157                                        log::warn!("Failed to shutdown stream: {e:?}");
158                                    }
159                                    break; // Break out of receiving loop
160                                }
161                            }
162                        }
163                    }
164                }
165                Err(e) => {
166                    // Connection to Server failed - retry
167                    log::warn!(
168                        "{} Client: {} failed to connect: {e:?}",
169                        self.connection_name,
170                        self.server_address
171                    );
172                    self.send_retries += 1;
173
174                    if self.send_retries >= self.max_send_retries {
175                        log::warn!(
176                            "Client {} failed to connect to Server after {} retries",
177                            self.server_address,
178                            self.max_send_retries
179                        );
180                        self.internal_client_state = false;
181                        break;
182                    }
183
184                    // Halved exponential backoff before retrying connection
185                    sleep(Duration::from_secs(
186                        (2u64.pow(self.send_retries as u32)) / 2,
187                    ))
188                    .await;
189                }
190            }
191        }
192
193        log::info!(
194            "{} Client: {} closed gracefully",
195            self.connection_name,
196            self.server_address
197        );
198        Ok(())
199    }
200
201    fn is_closing_connection(&self) -> bool {
202        if ContainerState::from(self.container_state.load(Ordering::SeqCst))
203            == ContainerState::ShuttingDown
204        {
205            log::info!(
206                "{} Client ({}) has received closing instruction via ContainerState...",
207                self.connection_name,
208                self.server_address
209            );
210            true
211        } else if !self.internal_client_state {
212            log::info!(
213                "{} Client ({}) is closing due to connection issues...",
214                self.connection_name,
215                self.server_address
216            );
217            true
218        } else {
219            false
220        }
221    }
222}