Skip to main content

ironsbe_client/
local_builder.rs

1//! Single-threaded client builder for thread-per-core / `!Send` backends.
2//!
3//! Mirrors [`crate::ClientBuilder`] but is generic over [`LocalTransport`]
4//! instead of [`Transport`](ironsbe_transport::Transport).  Use this when
5//! the chosen backend (e.g. `tokio-uring` via the `tcp-uring` feature) is
6//! single-threaded by construction.
7//!
8//! [`LocalClient::run`] must be polled inside a single-threaded reactor
9//! that owns a Tokio `LocalSet` (typically `tokio_uring::start`).
10
11use crate::builder::{ClientCommand, ClientEvent, ClientHandle};
12use crate::error::ClientError;
13use crate::reconnect::{ReconnectConfig, ReconnectState};
14use ironsbe_channel::spsc;
15use ironsbe_transport::traits::{LocalConnection, LocalTransport};
16use std::marker::PhantomData;
17use std::net::SocketAddr;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::Notify;
21
22/// Builder for [`LocalClient`].
23///
24/// Single-threaded counterpart of [`crate::ClientBuilder`]; the type
25/// parameter `T` selects a [`LocalTransport`] backend rather than the
26/// multi-threaded [`Transport`](ironsbe_transport::Transport) family.
27pub struct LocalClientBuilder<T: LocalTransport> {
28    server_addr: SocketAddr,
29    connect_config: Option<T::ConnectConfig>,
30    connect_timeout: Duration,
31    reconnect_config: ReconnectConfig,
32    channel_capacity: usize,
33    _transport: PhantomData<T>,
34}
35
36impl<T: LocalTransport> LocalClientBuilder<T> {
37    /// Creates a new local client builder targeting `server_addr`.
38    #[must_use]
39    pub fn new(server_addr: SocketAddr) -> Self {
40        Self {
41            server_addr,
42            connect_config: None,
43            connect_timeout: Duration::from_secs(5),
44            reconnect_config: ReconnectConfig::default(),
45            channel_capacity: 4096,
46            _transport: PhantomData,
47        }
48    }
49
50    /// Supplies a backend-specific connect configuration.
51    #[must_use]
52    pub fn connect_config(mut self, config: T::ConnectConfig) -> Self {
53        self.connect_config = Some(config);
54        self
55    }
56
57    /// Sets the outer connect timeout used by the reconnect loop.
58    #[must_use]
59    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
60        self.connect_timeout = timeout;
61        self
62    }
63
64    /// Enables or disables automatic reconnection.
65    #[must_use]
66    pub fn reconnect(mut self, enabled: bool) -> Self {
67        self.reconnect_config.enabled = enabled;
68        self
69    }
70
71    /// Sets the reconnection delay.
72    #[must_use]
73    pub fn reconnect_delay(mut self, delay: Duration) -> Self {
74        self.reconnect_config.initial_delay = delay;
75        self
76    }
77
78    /// Sets the maximum reconnection attempts.
79    #[must_use]
80    pub fn max_reconnect_attempts(mut self, max: usize) -> Self {
81        self.reconnect_config.max_attempts = max;
82        self
83    }
84
85    /// Sets the cmd/event channel capacity.
86    #[must_use]
87    pub fn channel_capacity(mut self, capacity: usize) -> Self {
88        self.channel_capacity = capacity;
89        self
90    }
91
92    /// Builds the client and its external handle.
93    #[must_use]
94    pub fn build(self) -> (LocalClient<T>, ClientHandle) {
95        let (cmd_tx, cmd_rx) = spsc::channel(self.channel_capacity);
96        let (event_tx, event_rx) = spsc::channel(self.channel_capacity);
97        let cmd_notify = Arc::new(Notify::new());
98        let event_notify = Arc::new(Notify::new());
99
100        let client = LocalClient {
101            server_addr: self.server_addr,
102            connect_config: Some(
103                self.connect_config
104                    .unwrap_or_else(|| T::ConnectConfig::from(self.server_addr)),
105            ),
106            connect_timeout: self.connect_timeout,
107            reconnect_state: ReconnectState::new(self.reconnect_config),
108            cmd_rx,
109            event_tx,
110            cmd_notify: Arc::clone(&cmd_notify),
111            event_notify: Arc::clone(&event_notify),
112            _transport: PhantomData,
113        };
114
115        let handle = ClientHandle::new(cmd_tx, event_rx, cmd_notify, event_notify);
116        (client, handle)
117    }
118}
119
120/// Single-threaded client instance for [`LocalTransport`] backends.
121///
122/// `LocalClient::run` **must** be polled inside a Tokio `LocalSet`
123/// (typically `tokio_uring::start(async move { client.run().await })`).
124pub struct LocalClient<T: LocalTransport> {
125    server_addr: SocketAddr,
126    connect_config: Option<T::ConnectConfig>,
127    connect_timeout: Duration,
128    reconnect_state: ReconnectState,
129    cmd_rx: spsc::SpscReceiver<ClientCommand>,
130    event_tx: spsc::SpscSender<ClientEvent>,
131    cmd_notify: Arc<Notify>,
132    event_notify: Arc<Notify>,
133    _transport: PhantomData<T>,
134}
135
136impl<T: LocalTransport> LocalClient<T> {
137    /// Runs the client, connecting and processing messages until shutdown.
138    ///
139    /// # Errors
140    /// Returns [`ClientError`] if the connection fails repeatedly or the
141    /// session encounters an unrecoverable error.
142    pub async fn run(&mut self) -> Result<(), ClientError> {
143        loop {
144            match self.connect_and_run().await {
145                Ok(()) => return Ok(()),
146                Err(e) => {
147                    tracing::error!("Local client connection error: {:?}", e);
148                    if let Some(delay) = self.reconnect_state.on_failure() {
149                        let _ = self.event_tx.send(ClientEvent::Disconnected);
150                        self.event_notify.notify_one();
151                        tracing::info!("Reconnecting in {:?}...", delay);
152                        tokio::time::sleep(delay).await;
153                    } else {
154                        tracing::error!("Max reconnect attempts reached");
155                        return Err(ClientError::MaxReconnectAttempts);
156                    }
157                }
158            }
159        }
160    }
161
162    async fn connect_and_run(&mut self) -> Result<(), ClientError> {
163        // Reconnect attempts share the same connect_config; clone on
164        // each attempt so a custom config survives across reconnects.
165        let connect_config = self
166            .connect_config
167            .clone()
168            .unwrap_or_else(|| T::ConnectConfig::from(self.server_addr));
169        let mut conn = tokio::time::timeout(self.connect_timeout, T::connect_with(connect_config))
170            .await
171            .map_err(|_| ClientError::ConnectTimeout)?
172            .map_err(|e| ClientError::Io(std::io::Error::other(e.to_string())))?;
173
174        self.reconnect_state.on_success();
175
176        let _ = self.event_tx.send(ClientEvent::Connected);
177        self.event_notify.notify_one();
178        tracing::info!("Local client connected to {}", self.server_addr);
179
180        loop {
181            tokio::select! {
182                _ = self.cmd_notify.notified() => {
183                    while let Some(cmd) = self.cmd_rx.recv() {
184                        match cmd {
185                            ClientCommand::Send(msg) => {
186                                conn.send(&msg)
187                                    .await
188                                    .map_err(|e| ClientError::Io(std::io::Error::other(e.to_string())))?;
189                            }
190                            ClientCommand::Disconnect => return Ok(()),
191                        }
192                    }
193                }
194
195                result = conn.recv() => {
196                    match result {
197                        Ok(Some(msg)) => {
198                            let _ = self.event_tx.send(ClientEvent::Message(msg.to_vec()));
199                            self.event_notify.notify_one();
200                        }
201                        Ok(None) => return Err(ClientError::ConnectionClosed),
202                        Err(e) => {
203                            return Err(ClientError::Io(std::io::Error::other(e.to_string())));
204                        }
205                    }
206                }
207            }
208        }
209    }
210}
211
212#[cfg(all(test, feature = "tcp-uring", target_os = "linux"))]
213mod tests {
214    use super::*;
215    use ironsbe_transport::tcp_uring::UringTcpTransport;
216
217    #[test]
218    fn test_local_client_builder_new() {
219        let addr: SocketAddr = "127.0.0.1:9000".parse().expect("test addr");
220        let builder = LocalClientBuilder::<UringTcpTransport>::new(addr);
221        let _ = builder;
222    }
223
224    #[test]
225    fn test_local_client_builder_connect_timeout() {
226        let addr: SocketAddr = "127.0.0.1:9000".parse().expect("test addr");
227        let builder = LocalClientBuilder::<UringTcpTransport>::new(addr)
228            .connect_timeout(Duration::from_secs(2));
229        let _ = builder;
230    }
231
232    #[test]
233    fn test_local_client_builder_build() {
234        let addr: SocketAddr = "127.0.0.1:9000".parse().expect("test addr");
235        let (_client, _handle) = LocalClientBuilder::<UringTcpTransport>::new(addr).build();
236    }
237}