hyperion_framework/network/client.rs
1// -------------------------------------------------------------------------------------------------
2// Hyperion Framework
3// https://github.com/Bazzz-1/hyperion-framework
4//
5// A lightweight component-based TCP framework for building service-oriented Rust applications with
6// CLI control, async messaging, and lifecycle management.
7//
8// Copyright 2025 Robert Hannah
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License at
13//
14// http://www.apache.org/licenses/LICENSE-2.0
15//
16// Unless required by applicable law or agreed to in writing, software
17// distributed under the License is distributed on an "AS IS" BASIS,
18// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19// See the License for the specific language governing permissions and
20// limitations under the License.
21// -------------------------------------------------------------------------------------------------
22
23// Standard
24use std::fmt::Debug;
25use std::sync::{
26 Arc as StdArc,
27 atomic::{AtomicUsize, Ordering},
28};
29
30// Package
31use serde::Serialize;
32use tokio::io::AsyncWriteExt;
33use tokio::net::TcpStream;
34use tokio::sync::{Notify, mpsc};
35use tokio::time::{Duration, sleep};
36
37// Local
38use crate::containerisation::container_state::ContainerState;
39use crate::network::serialiser;
40
41pub struct Client<T> {
42 pub connection_name: String,
43 server_address: String,
44 client_rx: mpsc::Receiver<T>,
45 container_state: StdArc<AtomicUsize>,
46 container_state_notify: StdArc<Notify>,
47 send_retries: u8,
48 max_send_retries: u8,
49 internal_client_state: bool,
50}
51
52// ========================================================================
53// A Client will ONLY send messages
54// ========================================================================
55
56impl<T> Client<T>
57where
58 T: Clone + Serialize + Debug,
59{
60 // ========================================================================
61 // Create new Client
62 // ========================================================================
63 pub fn new(
64 connection_name: String,
65 server_address: String,
66 client_rx: mpsc::Receiver<T>,
67 container_state: StdArc<AtomicUsize>,
68 container_state_notify: StdArc<Notify>,
69 max_send_retries: u8,
70 ) -> Self {
71 Self {
72 connection_name,
73 server_address,
74 client_rx,
75 container_state,
76 container_state_notify,
77 send_retries: 0,
78 max_send_retries,
79 internal_client_state: true, // Default to true, Client is operational
80 }
81 }
82
83 // ========================================================================
84 // Run async Client
85 // ========================================================================
86 pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error>> {
87 self.internal_client_state = true;
88 self.send_retries = 0;
89
90 loop {
91 // This check must be here or the client will just keep restarting if message send failure
92 if self.is_closing_connection() {
93 break;
94 }
95
96 match TcpStream::connect(self.server_address.to_string()).await {
97 Ok(mut stream) => {
98 log::info!(
99 "{} Client connected on {}",
100 self.connection_name,
101 self.server_address
102 );
103 self.send_retries = 0; // Reset retry counter after successful connection
104
105 loop {
106 tokio::select! {
107 // Listen for incoming messages from tokio mpsc channel
108 // Using select and tokio as messages can be sent instantly
109 Some(message) = self.client_rx.recv() => {
110 let payload = match serialiser::serialise_message(&message) {
111 Ok(payload) => payload,
112 Err(e) => {
113 log::error!("Failed to serialise message: {e:?} \n{message:?}");
114 continue; // Skip this message and try the next one
115 }
116 };
117
118 // Length-prefix the payload
119 let len = (payload.len() as u32).to_be_bytes(); // 4-byte big-endian length
120 let mut framed_msg = Vec::with_capacity(4 + payload.len());
121 framed_msg.extend_from_slice(&len);
122 framed_msg.extend_from_slice(&payload);
123
124 // Send message
125 match stream.write_all(&framed_msg).await {
126 Ok(_) => {
127 // Optional debug
128 // log::debug!("Message sent: {message:?}");
129 }
130 Err(e) => {
131 log::warn!("Failed to send message to {}: {e:?}", self.connection_name);
132 self.send_retries += 1;
133
134 if self.send_retries >= self.max_send_retries {
135 log::warn!("{} Client: {} failed to send message after {} retries. Closing client...",
136 self.connection_name, self.server_address, self.max_send_retries);
137 self.internal_client_state = false;
138
139 // Close stream
140 if let Err(e) = stream.shutdown().await {
141 log::warn!("Failed to shutdown stream: {e:?}");
142 }
143 break;
144 }
145
146 // Halved exponential backoff before retrying connection
147 sleep(Duration::from_secs((2u64.pow(self.send_retries as u32)) / 2)).await;
148 }
149 }
150 },
151
152 // Listen for state changes
153 _ = self.container_state_notify.notified() => {
154 // Check if state has been set to ShuttingDown
155 if self.is_closing_connection() {
156 // Close the stream
157 if let Err(e) = stream.shutdown().await {
158 log::warn!("Failed to shutdown stream: {e:?}");
159 }
160 break; // Break out of receiving loop
161 }
162 }
163 }
164 }
165 }
166 Err(e) => {
167 // Connection to Server failed - retry
168 log::warn!(
169 "{} Client: {} failed to connect: {e:?}",
170 self.connection_name,
171 self.server_address
172 );
173 self.send_retries += 1;
174
175 if self.send_retries >= self.max_send_retries {
176 log::warn!(
177 "Client {} failed to connect to Server after {} retries",
178 self.server_address,
179 self.max_send_retries
180 );
181 self.internal_client_state = false;
182 break;
183 }
184
185 // Halved exponential backoff before retrying connection
186 sleep(Duration::from_secs(
187 (2u64.pow(self.send_retries as u32)) / 2,
188 ))
189 .await;
190 }
191 }
192 }
193
194 log::info!(
195 "{} Client: {} closed gracefully",
196 self.connection_name,
197 self.server_address
198 );
199 Ok(())
200 }
201
202 fn is_closing_connection(&self) -> bool {
203 if ContainerState::from(self.container_state.load(Ordering::SeqCst))
204 == ContainerState::ShuttingDown
205 {
206 log::info!(
207 "{} Client ({}) has received closing instruction via ContainerState...",
208 self.connection_name,
209 self.server_address
210 );
211 true
212 } else if !self.internal_client_state {
213 log::info!(
214 "{} Client ({}) is closing due to connection issues...",
215 self.connection_name,
216 self.server_address
217 );
218 true
219 } else {
220 false
221 }
222 }
223}