Skip to main content

ironsbe_client/
builder.rs

1//! Client builder and main client implementation.
2
3use crate::error::ClientError;
4use crate::reconnect::{ReconnectConfig, ReconnectState};
5use crate::session::ClientSession;
6use ironsbe_channel::spsc;
7use std::net::SocketAddr;
8use std::time::Duration;
9
10/// Builder for configuring and creating a client.
11pub struct ClientBuilder {
12    server_addr: SocketAddr,
13    connect_timeout: Duration,
14    reconnect_config: ReconnectConfig,
15    channel_capacity: usize,
16}
17
18impl ClientBuilder {
19    /// Creates a new client builder for the specified server address.
20    #[must_use]
21    pub fn new(server_addr: SocketAddr) -> Self {
22        Self {
23            server_addr,
24            connect_timeout: Duration::from_secs(5),
25            reconnect_config: ReconnectConfig::default(),
26            channel_capacity: 4096,
27        }
28    }
29
30    /// Sets the connection timeout.
31    #[must_use]
32    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
33        self.connect_timeout = timeout;
34        self
35    }
36
37    /// Enables or disables automatic reconnection.
38    #[must_use]
39    pub fn reconnect(mut self, enabled: bool) -> Self {
40        self.reconnect_config.enabled = enabled;
41        self
42    }
43
44    /// Sets the reconnection delay.
45    #[must_use]
46    pub fn reconnect_delay(mut self, delay: Duration) -> Self {
47        self.reconnect_config.initial_delay = delay;
48        self
49    }
50
51    /// Sets the maximum reconnection attempts.
52    #[must_use]
53    pub fn max_reconnect_attempts(mut self, max: usize) -> Self {
54        self.reconnect_config.max_attempts = max;
55        self
56    }
57
58    /// Sets the channel capacity.
59    #[must_use]
60    pub fn channel_capacity(mut self, capacity: usize) -> Self {
61        self.channel_capacity = capacity;
62        self
63    }
64
65    /// Builds the client and handle.
66    #[must_use]
67    pub fn build(self) -> (Client, ClientHandle) {
68        let (cmd_tx, cmd_rx) = spsc::channel(self.channel_capacity);
69        let (event_tx, event_rx) = spsc::channel(self.channel_capacity);
70
71        let client = Client {
72            server_addr: self.server_addr,
73            connect_timeout: self.connect_timeout,
74            reconnect_state: ReconnectState::new(self.reconnect_config),
75            cmd_rx,
76            event_tx,
77        };
78
79        let handle = ClientHandle { cmd_tx, event_rx };
80
81        (client, handle)
82    }
83}
84
85/// The main client instance.
86pub struct Client {
87    server_addr: SocketAddr,
88    connect_timeout: Duration,
89    reconnect_state: ReconnectState,
90    cmd_rx: spsc::SpscReceiver<ClientCommand>,
91    event_tx: spsc::SpscSender<ClientEvent>,
92}
93
94impl Client {
95    /// Runs the client, connecting to the server and processing messages.
96    ///
97    /// # Errors
98    /// Returns `ClientError` if the client fails to connect or encounters an error.
99    pub async fn run(&mut self) -> Result<(), ClientError> {
100        loop {
101            match self.connect_and_run().await {
102                Ok(()) => {
103                    // Normal shutdown
104                    return Ok(());
105                }
106                Err(e) => {
107                    tracing::error!("Connection error: {:?}", e);
108
109                    if let Some(delay) = self.reconnect_state.on_failure() {
110                        let _ = self.event_tx.send(ClientEvent::Disconnected);
111                        tracing::info!("Reconnecting in {:?}...", delay);
112                        tokio::time::sleep(delay).await;
113                    } else {
114                        tracing::error!("Max reconnect attempts reached");
115                        return Err(ClientError::MaxReconnectAttempts);
116                    }
117                }
118            }
119        }
120    }
121
122    async fn connect_and_run(&mut self) -> Result<(), ClientError> {
123        let stream = tokio::time::timeout(
124            self.connect_timeout,
125            tokio::net::TcpStream::connect(self.server_addr),
126        )
127        .await
128        .map_err(|_| ClientError::ConnectTimeout)?
129        .map_err(ClientError::Io)?;
130
131        stream.set_nodelay(true)?;
132        self.reconnect_state.on_success();
133
134        let _ = self.event_tx.send(ClientEvent::Connected);
135        tracing::info!("Connected to {}", self.server_addr);
136
137        let mut session = ClientSession::new(stream);
138
139        loop {
140            tokio::select! {
141                cmd = async { self.cmd_rx.recv() } => {
142                    match cmd {
143                        Some(ClientCommand::Send(msg)) => {
144                            session.send(&msg).await?;
145                        }
146                        Some(ClientCommand::Disconnect) => {
147                            return Ok(());
148                        }
149                        None => {
150                            // Channel closed, check again
151                            tokio::task::yield_now().await;
152                        }
153                    }
154                }
155
156                result = session.recv() => {
157                    match result {
158                        Ok(Some(msg)) => {
159                            let _ = self.event_tx.send(ClientEvent::Message(msg.to_vec()));
160                        }
161                        Ok(None) => {
162                            return Err(ClientError::ConnectionClosed);
163                        }
164                        Err(e) => {
165                            return Err(ClientError::Io(e));
166                        }
167                    }
168                }
169            }
170        }
171    }
172}
173
174/// Handle for sending messages and receiving events.
175pub struct ClientHandle {
176    cmd_tx: spsc::SpscSender<ClientCommand>,
177    event_rx: spsc::SpscReceiver<ClientEvent>,
178}
179
180impl ClientHandle {
181    /// Sends an SBE message to the server (non-blocking).
182    ///
183    /// # Errors
184    /// Returns error if the channel is disconnected.
185    #[inline]
186    pub fn send(&mut self, message: Vec<u8>) -> Result<(), ClientError> {
187        self.cmd_tx
188            .send(ClientCommand::Send(message))
189            .map_err(|_| ClientError::Channel)
190    }
191
192    /// Disconnects from the server.
193    pub fn disconnect(&mut self) {
194        let _ = self.cmd_tx.send(ClientCommand::Disconnect);
195    }
196
197    /// Polls for events (non-blocking).
198    #[inline]
199    pub fn poll(&mut self) -> Option<ClientEvent> {
200        self.event_rx.recv()
201    }
202
203    /// Busy-poll for next event (for hot path).
204    #[inline]
205    pub fn poll_spin(&mut self) -> ClientEvent {
206        self.event_rx.recv_spin()
207    }
208
209    /// Drains all available events.
210    pub fn drain(&mut self) -> impl Iterator<Item = ClientEvent> + '_ {
211        self.event_rx.drain()
212    }
213}
214
215/// Commands that can be sent to the client.
216#[derive(Debug)]
217pub enum ClientCommand {
218    /// Send a message to the server.
219    Send(Vec<u8>),
220    /// Disconnect from the server.
221    Disconnect,
222}
223
224/// Events emitted by the client.
225#[derive(Debug, Clone)]
226pub enum ClientEvent {
227    /// Connected to the server.
228    Connected,
229    /// Disconnected from the server.
230    Disconnected,
231    /// Received a message from the server.
232    Message(Vec<u8>),
233    /// An error occurred.
234    Error(String),
235}