ironsbe_client/
builder.rs1use crate::error::ClientError;
4use crate::reconnect::{ReconnectConfig, ReconnectState};
5use crate::session::ClientSession;
6use ironsbe_channel::spsc;
7use std::net::SocketAddr;
8use std::time::Duration;
9
10pub struct ClientBuilder {
12 server_addr: SocketAddr,
13 connect_timeout: Duration,
14 reconnect_config: ReconnectConfig,
15 channel_capacity: usize,
16}
17
18impl ClientBuilder {
19 #[must_use]
21 pub fn new(server_addr: SocketAddr) -> Self {
22 Self {
23 server_addr,
24 connect_timeout: Duration::from_secs(5),
25 reconnect_config: ReconnectConfig::default(),
26 channel_capacity: 4096,
27 }
28 }
29
30 #[must_use]
32 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
33 self.connect_timeout = timeout;
34 self
35 }
36
37 #[must_use]
39 pub fn reconnect(mut self, enabled: bool) -> Self {
40 self.reconnect_config.enabled = enabled;
41 self
42 }
43
44 #[must_use]
46 pub fn reconnect_delay(mut self, delay: Duration) -> Self {
47 self.reconnect_config.initial_delay = delay;
48 self
49 }
50
51 #[must_use]
53 pub fn max_reconnect_attempts(mut self, max: usize) -> Self {
54 self.reconnect_config.max_attempts = max;
55 self
56 }
57
58 #[must_use]
60 pub fn channel_capacity(mut self, capacity: usize) -> Self {
61 self.channel_capacity = capacity;
62 self
63 }
64
65 #[must_use]
67 pub fn build(self) -> (Client, ClientHandle) {
68 let (cmd_tx, cmd_rx) = spsc::channel(self.channel_capacity);
69 let (event_tx, event_rx) = spsc::channel(self.channel_capacity);
70
71 let client = Client {
72 server_addr: self.server_addr,
73 connect_timeout: self.connect_timeout,
74 reconnect_state: ReconnectState::new(self.reconnect_config),
75 cmd_rx,
76 event_tx,
77 };
78
79 let handle = ClientHandle { cmd_tx, event_rx };
80
81 (client, handle)
82 }
83}
84
85pub struct Client {
87 server_addr: SocketAddr,
88 connect_timeout: Duration,
89 reconnect_state: ReconnectState,
90 cmd_rx: spsc::SpscReceiver<ClientCommand>,
91 event_tx: spsc::SpscSender<ClientEvent>,
92}
93
94impl Client {
95 pub async fn run(&mut self) -> Result<(), ClientError> {
100 loop {
101 match self.connect_and_run().await {
102 Ok(()) => {
103 return Ok(());
105 }
106 Err(e) => {
107 tracing::error!("Connection error: {:?}", e);
108
109 if let Some(delay) = self.reconnect_state.on_failure() {
110 let _ = self.event_tx.send(ClientEvent::Disconnected);
111 tracing::info!("Reconnecting in {:?}...", delay);
112 tokio::time::sleep(delay).await;
113 } else {
114 tracing::error!("Max reconnect attempts reached");
115 return Err(ClientError::MaxReconnectAttempts);
116 }
117 }
118 }
119 }
120 }
121
122 async fn connect_and_run(&mut self) -> Result<(), ClientError> {
123 let stream = tokio::time::timeout(
124 self.connect_timeout,
125 tokio::net::TcpStream::connect(self.server_addr),
126 )
127 .await
128 .map_err(|_| ClientError::ConnectTimeout)?
129 .map_err(ClientError::Io)?;
130
131 stream.set_nodelay(true)?;
132 self.reconnect_state.on_success();
133
134 let _ = self.event_tx.send(ClientEvent::Connected);
135 tracing::info!("Connected to {}", self.server_addr);
136
137 let mut session = ClientSession::new(stream);
138
139 loop {
140 tokio::select! {
141 cmd = async { self.cmd_rx.recv() } => {
142 match cmd {
143 Some(ClientCommand::Send(msg)) => {
144 session.send(&msg).await?;
145 }
146 Some(ClientCommand::Disconnect) => {
147 return Ok(());
148 }
149 None => {
150 tokio::task::yield_now().await;
152 }
153 }
154 }
155
156 result = session.recv() => {
157 match result {
158 Ok(Some(msg)) => {
159 let _ = self.event_tx.send(ClientEvent::Message(msg.to_vec()));
160 }
161 Ok(None) => {
162 return Err(ClientError::ConnectionClosed);
163 }
164 Err(e) => {
165 return Err(ClientError::Io(e));
166 }
167 }
168 }
169 }
170 }
171 }
172}
173
174pub struct ClientHandle {
176 cmd_tx: spsc::SpscSender<ClientCommand>,
177 event_rx: spsc::SpscReceiver<ClientEvent>,
178}
179
180impl ClientHandle {
181 #[inline]
186 pub fn send(&mut self, message: Vec<u8>) -> Result<(), ClientError> {
187 self.cmd_tx
188 .send(ClientCommand::Send(message))
189 .map_err(|_| ClientError::Channel)
190 }
191
192 pub fn disconnect(&mut self) {
194 let _ = self.cmd_tx.send(ClientCommand::Disconnect);
195 }
196
197 #[inline]
199 pub fn poll(&mut self) -> Option<ClientEvent> {
200 self.event_rx.recv()
201 }
202
203 #[inline]
205 pub fn poll_spin(&mut self) -> ClientEvent {
206 self.event_rx.recv_spin()
207 }
208
209 pub fn drain(&mut self) -> impl Iterator<Item = ClientEvent> + '_ {
211 self.event_rx.drain()
212 }
213}
214
215#[derive(Debug)]
217pub enum ClientCommand {
218 Send(Vec<u8>),
220 Disconnect,
222}
223
224#[derive(Debug, Clone)]
226pub enum ClientEvent {
227 Connected,
229 Disconnected,
231 Message(Vec<u8>),
233 Error(String),
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use std::time::Duration;
241
242 #[test]
243 fn test_client_builder_new() {
244 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
245 let builder = ClientBuilder::new(addr);
246 let _ = builder;
247 }
248
249 #[test]
250 fn test_client_builder_connect_timeout() {
251 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
252 let builder = ClientBuilder::new(addr).connect_timeout(Duration::from_secs(10));
253 let _ = builder;
254 }
255
256 #[test]
257 fn test_client_builder_reconnect() {
258 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
259 let builder = ClientBuilder::new(addr).reconnect(true);
260 let _ = builder;
261 }
262
263 #[test]
264 fn test_client_builder_reconnect_delay() {
265 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
266 let builder = ClientBuilder::new(addr).reconnect_delay(Duration::from_millis(500));
267 let _ = builder;
268 }
269
270 #[test]
271 fn test_client_builder_max_reconnect_attempts() {
272 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
273 let builder = ClientBuilder::new(addr).max_reconnect_attempts(5);
274 let _ = builder;
275 }
276
277 #[test]
278 fn test_client_builder_channel_capacity() {
279 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
280 let builder = ClientBuilder::new(addr).channel_capacity(8192);
281 let _ = builder;
282 }
283
284 #[test]
285 fn test_client_builder_build() {
286 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
287 let (_client, _handle) = ClientBuilder::new(addr).build();
288 }
289
290 #[test]
291 fn test_client_command_debug() {
292 let cmd = ClientCommand::Send(vec![1, 2, 3]);
293 let debug_str = format!("{:?}", cmd);
294 assert!(debug_str.contains("Send"));
295
296 let cmd2 = ClientCommand::Disconnect;
297 let debug_str2 = format!("{:?}", cmd2);
298 assert!(debug_str2.contains("Disconnect"));
299 }
300
301 #[test]
302 fn test_client_event_clone_debug() {
303 let event = ClientEvent::Connected;
304 let cloned = event.clone();
305 let _ = cloned;
306
307 let debug_str = format!("{:?}", event);
308 assert!(debug_str.contains("Connected"));
309
310 let event2 = ClientEvent::Message(vec![1, 2, 3]);
311 let debug_str2 = format!("{:?}", event2);
312 assert!(debug_str2.contains("Message"));
313
314 let event3 = ClientEvent::Error("test error".to_string());
315 let debug_str3 = format!("{:?}", event3);
316 assert!(debug_str3.contains("Error"));
317 }
318
319 #[test]
320 fn test_client_handle_disconnect() {
321 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
322 let (_client, mut handle) = ClientBuilder::new(addr).build();
323 handle.disconnect();
324 }
325
326 #[test]
327 fn test_client_handle_poll() {
328 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
329 let (_client, mut handle) = ClientBuilder::new(addr).build();
330 assert!(handle.poll().is_none());
331 }
332}