ironsbe_client/
builder.rs1use crate::error::ClientError;
4use crate::reconnect::{ReconnectConfig, ReconnectState};
5use crate::session::ClientSession;
6use ironsbe_channel::spsc;
7use std::net::SocketAddr;
8use std::time::Duration;
9
10pub struct ClientBuilder {
12 server_addr: SocketAddr,
13 connect_timeout: Duration,
14 reconnect_config: ReconnectConfig,
15 channel_capacity: usize,
16}
17
18impl ClientBuilder {
19 #[must_use]
21 pub fn new(server_addr: SocketAddr) -> Self {
22 Self {
23 server_addr,
24 connect_timeout: Duration::from_secs(5),
25 reconnect_config: ReconnectConfig::default(),
26 channel_capacity: 4096,
27 }
28 }
29
30 #[must_use]
32 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
33 self.connect_timeout = timeout;
34 self
35 }
36
37 #[must_use]
39 pub fn reconnect(mut self, enabled: bool) -> Self {
40 self.reconnect_config.enabled = enabled;
41 self
42 }
43
44 #[must_use]
46 pub fn reconnect_delay(mut self, delay: Duration) -> Self {
47 self.reconnect_config.initial_delay = delay;
48 self
49 }
50
51 #[must_use]
53 pub fn max_reconnect_attempts(mut self, max: usize) -> Self {
54 self.reconnect_config.max_attempts = max;
55 self
56 }
57
58 #[must_use]
60 pub fn channel_capacity(mut self, capacity: usize) -> Self {
61 self.channel_capacity = capacity;
62 self
63 }
64
65 #[must_use]
67 pub fn build(self) -> (Client, ClientHandle) {
68 let (cmd_tx, cmd_rx) = spsc::channel(self.channel_capacity);
69 let (event_tx, event_rx) = spsc::channel(self.channel_capacity);
70
71 let client = Client {
72 server_addr: self.server_addr,
73 connect_timeout: self.connect_timeout,
74 reconnect_state: ReconnectState::new(self.reconnect_config),
75 cmd_rx,
76 event_tx,
77 };
78
79 let handle = ClientHandle { cmd_tx, event_rx };
80
81 (client, handle)
82 }
83}
84
85pub struct Client {
87 server_addr: SocketAddr,
88 connect_timeout: Duration,
89 reconnect_state: ReconnectState,
90 cmd_rx: spsc::SpscReceiver<ClientCommand>,
91 event_tx: spsc::SpscSender<ClientEvent>,
92}
93
94impl Client {
95 pub async fn run(&mut self) -> Result<(), ClientError> {
100 loop {
101 match self.connect_and_run().await {
102 Ok(()) => {
103 return Ok(());
105 }
106 Err(e) => {
107 tracing::error!("Connection error: {:?}", e);
108
109 if let Some(delay) = self.reconnect_state.on_failure() {
110 let _ = self.event_tx.send(ClientEvent::Disconnected);
111 tracing::info!("Reconnecting in {:?}...", delay);
112 tokio::time::sleep(delay).await;
113 } else {
114 tracing::error!("Max reconnect attempts reached");
115 return Err(ClientError::MaxReconnectAttempts);
116 }
117 }
118 }
119 }
120 }
121
122 async fn connect_and_run(&mut self) -> Result<(), ClientError> {
123 let stream = tokio::time::timeout(
124 self.connect_timeout,
125 tokio::net::TcpStream::connect(self.server_addr),
126 )
127 .await
128 .map_err(|_| ClientError::ConnectTimeout)?
129 .map_err(ClientError::Io)?;
130
131 stream.set_nodelay(true)?;
132 self.reconnect_state.on_success();
133
134 let _ = self.event_tx.send(ClientEvent::Connected);
135 tracing::info!("Connected to {}", self.server_addr);
136
137 let mut session = ClientSession::new(stream);
138
139 loop {
140 tokio::select! {
141 cmd = async { self.cmd_rx.recv() } => {
142 match cmd {
143 Some(ClientCommand::Send(msg)) => {
144 session.send(&msg).await?;
145 }
146 Some(ClientCommand::Disconnect) => {
147 return Ok(());
148 }
149 None => {
150 tokio::task::yield_now().await;
152 }
153 }
154 }
155
156 result = session.recv() => {
157 match result {
158 Ok(Some(msg)) => {
159 let _ = self.event_tx.send(ClientEvent::Message(msg.to_vec()));
160 }
161 Ok(None) => {
162 return Err(ClientError::ConnectionClosed);
163 }
164 Err(e) => {
165 return Err(ClientError::Io(e));
166 }
167 }
168 }
169 }
170 }
171 }
172}
173
174pub struct ClientHandle {
176 cmd_tx: spsc::SpscSender<ClientCommand>,
177 event_rx: spsc::SpscReceiver<ClientEvent>,
178}
179
180impl ClientHandle {
181 #[inline]
186 pub fn send(&mut self, message: Vec<u8>) -> Result<(), ClientError> {
187 self.cmd_tx
188 .send(ClientCommand::Send(message))
189 .map_err(|_| ClientError::Channel)
190 }
191
192 pub fn disconnect(&mut self) {
194 let _ = self.cmd_tx.send(ClientCommand::Disconnect);
195 }
196
197 #[inline]
199 pub fn poll(&mut self) -> Option<ClientEvent> {
200 self.event_rx.recv()
201 }
202
203 #[inline]
205 pub fn poll_spin(&mut self) -> ClientEvent {
206 self.event_rx.recv_spin()
207 }
208
209 pub fn drain(&mut self) -> impl Iterator<Item = ClientEvent> + '_ {
211 self.event_rx.drain()
212 }
213}
214
215#[derive(Debug)]
217pub enum ClientCommand {
218 Send(Vec<u8>),
220 Disconnect,
222}
223
224#[derive(Debug, Clone)]
226pub enum ClientEvent {
227 Connected,
229 Disconnected,
231 Message(Vec<u8>),
233 Error(String),
235}