ironsbe_client/
local_builder.rs1use crate::builder::{ClientCommand, ClientEvent, ClientHandle};
12use crate::error::ClientError;
13use crate::reconnect::{ReconnectConfig, ReconnectState};
14use ironsbe_channel::spsc;
15use ironsbe_transport::traits::{LocalConnection, LocalTransport};
16use std::marker::PhantomData;
17use std::net::SocketAddr;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::Notify;
21
22pub struct LocalClientBuilder<T: LocalTransport> {
28 server_addr: SocketAddr,
29 connect_config: Option<T::ConnectConfig>,
30 connect_timeout: Duration,
31 reconnect_config: ReconnectConfig,
32 channel_capacity: usize,
33 _transport: PhantomData<T>,
34}
35
36impl<T: LocalTransport> LocalClientBuilder<T> {
37 #[must_use]
39 pub fn new(server_addr: SocketAddr) -> Self {
40 Self {
41 server_addr,
42 connect_config: None,
43 connect_timeout: Duration::from_secs(5),
44 reconnect_config: ReconnectConfig::default(),
45 channel_capacity: 4096,
46 _transport: PhantomData,
47 }
48 }
49
50 #[must_use]
52 pub fn connect_config(mut self, config: T::ConnectConfig) -> Self {
53 self.connect_config = Some(config);
54 self
55 }
56
57 #[must_use]
59 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
60 self.connect_timeout = timeout;
61 self
62 }
63
64 #[must_use]
66 pub fn reconnect(mut self, enabled: bool) -> Self {
67 self.reconnect_config.enabled = enabled;
68 self
69 }
70
71 #[must_use]
73 pub fn reconnect_delay(mut self, delay: Duration) -> Self {
74 self.reconnect_config.initial_delay = delay;
75 self
76 }
77
78 #[must_use]
80 pub fn max_reconnect_attempts(mut self, max: usize) -> Self {
81 self.reconnect_config.max_attempts = max;
82 self
83 }
84
85 #[must_use]
87 pub fn channel_capacity(mut self, capacity: usize) -> Self {
88 self.channel_capacity = capacity;
89 self
90 }
91
92 #[must_use]
94 pub fn build(self) -> (LocalClient<T>, ClientHandle) {
95 let (cmd_tx, cmd_rx) = spsc::channel(self.channel_capacity);
96 let (event_tx, event_rx) = spsc::channel(self.channel_capacity);
97 let cmd_notify = Arc::new(Notify::new());
98 let event_notify = Arc::new(Notify::new());
99
100 let client = LocalClient {
101 server_addr: self.server_addr,
102 connect_config: Some(
103 self.connect_config
104 .unwrap_or_else(|| T::ConnectConfig::from(self.server_addr)),
105 ),
106 connect_timeout: self.connect_timeout,
107 reconnect_state: ReconnectState::new(self.reconnect_config),
108 cmd_rx,
109 event_tx,
110 cmd_notify: Arc::clone(&cmd_notify),
111 event_notify: Arc::clone(&event_notify),
112 _transport: PhantomData,
113 };
114
115 let handle = ClientHandle::new(cmd_tx, event_rx, cmd_notify, event_notify);
116 (client, handle)
117 }
118}
119
120pub struct LocalClient<T: LocalTransport> {
125 server_addr: SocketAddr,
126 connect_config: Option<T::ConnectConfig>,
127 connect_timeout: Duration,
128 reconnect_state: ReconnectState,
129 cmd_rx: spsc::SpscReceiver<ClientCommand>,
130 event_tx: spsc::SpscSender<ClientEvent>,
131 cmd_notify: Arc<Notify>,
132 event_notify: Arc<Notify>,
133 _transport: PhantomData<T>,
134}
135
136impl<T: LocalTransport> LocalClient<T> {
137 pub async fn run(&mut self) -> Result<(), ClientError> {
143 loop {
144 match self.connect_and_run().await {
145 Ok(()) => return Ok(()),
146 Err(e) => {
147 tracing::error!("Local client connection error: {:?}", e);
148 if let Some(delay) = self.reconnect_state.on_failure() {
149 let _ = self.event_tx.send(ClientEvent::Disconnected);
150 self.event_notify.notify_one();
151 tracing::info!("Reconnecting in {:?}...", delay);
152 tokio::time::sleep(delay).await;
153 } else {
154 tracing::error!("Max reconnect attempts reached");
155 return Err(ClientError::MaxReconnectAttempts);
156 }
157 }
158 }
159 }
160 }
161
162 async fn connect_and_run(&mut self) -> Result<(), ClientError> {
163 let connect_config = self
166 .connect_config
167 .clone()
168 .unwrap_or_else(|| T::ConnectConfig::from(self.server_addr));
169 let mut conn = tokio::time::timeout(self.connect_timeout, T::connect_with(connect_config))
170 .await
171 .map_err(|_| ClientError::ConnectTimeout)?
172 .map_err(|e| ClientError::Io(std::io::Error::other(e.to_string())))?;
173
174 self.reconnect_state.on_success();
175
176 let _ = self.event_tx.send(ClientEvent::Connected);
177 self.event_notify.notify_one();
178 tracing::info!("Local client connected to {}", self.server_addr);
179
180 loop {
181 tokio::select! {
182 _ = self.cmd_notify.notified() => {
183 while let Some(cmd) = self.cmd_rx.recv() {
184 match cmd {
185 ClientCommand::Send(msg) => {
186 conn.send(&msg)
187 .await
188 .map_err(|e| ClientError::Io(std::io::Error::other(e.to_string())))?;
189 }
190 ClientCommand::Disconnect => return Ok(()),
191 }
192 }
193 }
194
195 result = conn.recv() => {
196 match result {
197 Ok(Some(msg)) => {
198 let _ = self.event_tx.send(ClientEvent::Message(msg.to_vec()));
199 self.event_notify.notify_one();
200 }
201 Ok(None) => return Err(ClientError::ConnectionClosed),
202 Err(e) => {
203 return Err(ClientError::Io(std::io::Error::other(e.to_string())));
204 }
205 }
206 }
207 }
208 }
209 }
210}
211
212#[cfg(all(test, feature = "tcp-uring", target_os = "linux"))]
213mod tests {
214 use super::*;
215 use ironsbe_transport::tcp_uring::UringTcpTransport;
216
217 #[test]
218 fn test_local_client_builder_new() {
219 let addr: SocketAddr = "127.0.0.1:9000".parse().expect("test addr");
220 let builder = LocalClientBuilder::<UringTcpTransport>::new(addr);
221 let _ = builder;
222 }
223
224 #[test]
225 fn test_local_client_builder_connect_timeout() {
226 let addr: SocketAddr = "127.0.0.1:9000".parse().expect("test addr");
227 let builder = LocalClientBuilder::<UringTcpTransport>::new(addr)
228 .connect_timeout(Duration::from_secs(2));
229 let _ = builder;
230 }
231
232 #[test]
233 fn test_local_client_builder_build() {
234 let addr: SocketAddr = "127.0.0.1:9000".parse().expect("test addr");
235 let (_client, _handle) = LocalClientBuilder::<UringTcpTransport>::new(addr).build();
236 }
237}