hyperion_framework/network/client.rs
1// -------------------------------------------------------------------------------------------------
2// Hyperion Framework
3// https://github.com/robert-hannah/hyperion-framework
4//
5// A lightweight component-based TCP framework for building service-oriented Rust applications with
6// CLI control, async messaging, and lifecycle management.
7//
8// Copyright 2025 Robert Hannah
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License at
13//
14// http://www.apache.org/licenses/LICENSE-2.0
15//
16// Unless required by applicable law or agreed to in writing, software
17// distributed under the License is distributed on an "AS IS" BASIS,
18// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19// See the License for the specific language governing permissions and
20// limitations under the License.
21// -------------------------------------------------------------------------------------------------
22
23// Standard
24use std::fmt::Debug;
25use std::sync::{
26 Arc as StdArc,
27 atomic::{AtomicUsize, Ordering},
28};
29
30// Package
31use serde::Serialize;
32use tokio::io::AsyncWriteExt;
33use tokio::net::TcpStream;
34use tokio::sync::{Notify, mpsc};
35use tokio::time::{Duration, sleep};
36
37// Local
38use crate::containerisation::container_state::ContainerState;
39use crate::network::serialiser;
40
41pub struct Client<T> {
42 pub connection_name: String,
43 server_address: String,
44 client_rx: mpsc::Receiver<T>,
45 container_state: StdArc<AtomicUsize>,
46 container_state_notify: StdArc<Notify>,
47 send_retries: u8,
48 max_send_retries: u8,
49 internal_client_state: bool,
50}
51
52// ========================================================================
53// A Client will ONLY send messages
54// ========================================================================
55
56impl<T> Client<T>
57where
58 T: Clone + Serialize + Debug,
59{
60 // ========================================================================
61 // Create new Client
62 // ========================================================================
63 pub fn new(
64 connection_name: String,
65 server_address: String,
66 client_rx: mpsc::Receiver<T>,
67 container_state: StdArc<AtomicUsize>,
68 container_state_notify: StdArc<Notify>,
69 max_send_retries: u8,
70 ) -> Self {
71 Self {
72 connection_name,
73 server_address,
74 client_rx,
75 container_state,
76 container_state_notify,
77 send_retries: 0,
78 max_send_retries,
79 internal_client_state: true, // Default to true, Client is operational
80 }
81 }
82
83 // ========================================================================
84 // Run async Client
85 // ========================================================================
86 pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error>> {
87 self.internal_client_state = true;
88 self.send_retries = 0;
89
90 loop {
91 // This check must be here or the client will just keep restarting if message send failure
92 if self.is_closing_connection() {
93 break;
94 }
95
96 match TcpStream::connect(self.server_address.to_string()).await {
97 Ok(mut stream) => {
98 log::info!(
99 "{} Client connected on {}",
100 self.connection_name,
101 self.server_address
102 );
103 self.send_retries = 0; // Reset retry counter after successful connection
104
105 loop {
106 tokio::select! {
107 // Listen for incoming messages from tokio mpsc channel
108 // Using select and tokio as messages can be sent instantly
109 Some(message) = self.client_rx.recv() => {
110 let payload = match serialiser::serialise_message(&message) {
111 Ok(payload) => payload,
112 Err(e) => {
113 log::error!("Failed to serialise message: {e:?} \n{message:?}");
114 continue; // Skip this message and try the next one
115 }
116 };
117
118 // Length-prefix the payload
119 let len = (payload.len() as u32).to_be_bytes(); // 4-byte big-endian length
120 let mut framed_msg = Vec::with_capacity(4 + payload.len());
121 framed_msg.extend_from_slice(&len);
122 framed_msg.extend_from_slice(&payload);
123
124 // Send message
125 match stream.write_all(&framed_msg).await {
126 Ok(_) => {
127 log::trace!("Message sent: {message:?}");
128 }
129 Err(e) => {
130 log::warn!("Failed to send message to {}: {e:?}", self.connection_name);
131 self.send_retries += 1;
132
133 if self.send_retries >= self.max_send_retries {
134 log::warn!("{} Client: {} failed to send message after {} retries. Closing client...",
135 self.connection_name, self.server_address, self.max_send_retries);
136 self.internal_client_state = false;
137
138 // Close stream
139 if let Err(e) = stream.shutdown().await {
140 log::warn!("Failed to shutdown stream: {e:?}");
141 }
142 break;
143 }
144
145 // Halved exponential backoff before retrying connection
146 sleep(Duration::from_secs((2u64.pow(self.send_retries as u32)) / 2)).await;
147 }
148 }
149 },
150
151 // Listen for state changes
152 _ = self.container_state_notify.notified() => {
153 // Check if state has been set to ShuttingDown
154 if self.is_closing_connection() {
155 // Close the stream
156 if let Err(e) = stream.shutdown().await {
157 log::warn!("Failed to shutdown stream: {e:?}");
158 }
159 break; // Break out of receiving loop
160 }
161 }
162 }
163 }
164 }
165 Err(e) => {
166 // Connection to Server failed - retry
167 log::warn!(
168 "{} Client: {} failed to connect: {e:?}",
169 self.connection_name,
170 self.server_address
171 );
172 self.send_retries += 1;
173
174 if self.send_retries >= self.max_send_retries {
175 log::warn!(
176 "Client {} failed to connect to Server after {} retries",
177 self.server_address,
178 self.max_send_retries
179 );
180 self.internal_client_state = false;
181 break;
182 }
183
184 // Halved exponential backoff before retrying connection
185 sleep(Duration::from_secs(
186 (2u64.pow(self.send_retries as u32)) / 2,
187 ))
188 .await;
189 }
190 }
191 }
192
193 log::info!(
194 "{} Client: {} closed gracefully",
195 self.connection_name,
196 self.server_address
197 );
198 Ok(())
199 }
200
201 fn is_closing_connection(&self) -> bool {
202 if ContainerState::from(self.container_state.load(Ordering::SeqCst))
203 == ContainerState::ShuttingDown
204 {
205 log::info!(
206 "{} Client ({}) has received closing instruction via ContainerState...",
207 self.connection_name,
208 self.server_address
209 );
210 true
211 } else if !self.internal_client_state {
212 log::info!(
213 "{} Client ({}) is closing due to connection issues...",
214 self.connection_name,
215 self.server_address
216 );
217 true
218 } else {
219 false
220 }
221 }
222}