nomad_protocol/client/
client.rs1use std::net::SocketAddr;
7use std::sync::Arc;
8use std::time::Duration;
9
10use thiserror::Error;
11use tokio::sync::{mpsc, oneshot, RwLock};
12
13use crate::core::SyncState;
14
15#[derive(Debug, Error)]
17pub enum ClientError {
18 #[error("connection failed: {0}")]
20 ConnectionFailed(String),
21
22 #[error("handshake failed: {0}")]
24 HandshakeFailed(String),
25
26 #[error("session terminated: {0}")]
28 SessionTerminated(String),
29
30 #[error("I/O error: {0}")]
32 Io(#[from] std::io::Error),
33
34 #[error("sync error: {0}")]
36 SyncError(String),
37
38 #[error("client disconnected")]
40 Disconnected,
41
42 #[error("operation timed out")]
44 Timeout,
45}
46
47#[derive(Debug, Clone)]
49pub struct ClientConfig {
50 pub server_addr: SocketAddr,
52
53 pub server_public_key: [u8; 32],
55
56 pub client_private_key: Option<[u8; 32]>,
58
59 pub connect_timeout: Duration,
61
62 pub enable_compression: bool,
64}
65
66impl Default for ClientConfig {
67 fn default() -> Self {
68 Self {
69 server_addr: "127.0.0.1:19999".parse().unwrap(),
70 server_public_key: [0u8; 32],
71 client_private_key: None,
72 connect_timeout: Duration::from_secs(10),
73 enable_compression: true,
74 }
75 }
76}
77
78#[derive(Debug)]
80pub struct NomadClientBuilder {
81 config: ClientConfig,
82}
83
84impl NomadClientBuilder {
85 pub fn new() -> Self {
87 Self {
88 config: ClientConfig::default(),
89 }
90 }
91
92 pub fn server_addr(mut self, addr: SocketAddr) -> Self {
94 self.config.server_addr = addr;
95 self
96 }
97
98 pub fn server_public_key(mut self, key: [u8; 32]) -> Self {
100 self.config.server_public_key = key;
101 self
102 }
103
104 pub fn client_private_key(mut self, key: [u8; 32]) -> Self {
106 self.config.client_private_key = Some(key);
107 self
108 }
109
110 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
112 self.config.connect_timeout = timeout;
113 self
114 }
115
116 pub fn compression(mut self, enabled: bool) -> Self {
118 self.config.enable_compression = enabled;
119 self
120 }
121
122 pub fn build(self) -> ClientConfig {
124 self.config
125 }
126}
127
128impl Default for NomadClientBuilder {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum ClientState {
137 Disconnected,
139 Connecting,
141 Connected,
143 Closed,
145}
146
147pub struct StateSender<S: SyncState> {
149 tx: mpsc::Sender<S>,
150}
151
152impl<S: SyncState> StateSender<S> {
153 pub async fn send(&self, state: S) -> Result<(), ClientError> {
158 self.tx
159 .send(state)
160 .await
161 .map_err(|_| ClientError::Disconnected)
162 }
163}
164
165impl<S: SyncState> Clone for StateSender<S> {
166 fn clone(&self) -> Self {
167 Self {
168 tx: self.tx.clone(),
169 }
170 }
171}
172
173pub struct StateReceiver<S: SyncState> {
175 rx: mpsc::Receiver<S>,
176}
177
178impl<S: SyncState> StateReceiver<S> {
179 pub async fn recv(&mut self) -> Option<S> {
183 self.rx.recv().await
184 }
185}
186
187pub struct NomadClient<S: SyncState> {
212 state: Arc<RwLock<ClientState>>,
214
215 local_state: Arc<RwLock<S>>,
217
218 state_tx: mpsc::Sender<S>,
220
221 shutdown_tx: Option<oneshot::Sender<()>>,
223
224 config: ClientConfig,
226}
227
228impl<S: SyncState> NomadClient<S> {
229 pub async fn connect(
233 config: ClientConfig,
234 initial_state: S,
235 ) -> Result<(Self, StateReceiver<S>), ClientError> {
236 let (state_tx, _state_rx) = mpsc::channel::<S>(32);
238 let (server_state_tx, server_state_rx) = mpsc::channel::<S>(32);
239 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
240
241 let client_state = Arc::new(RwLock::new(ClientState::Connecting));
242 let local_state = Arc::new(RwLock::new(initial_state));
243
244 {
252 let mut state = client_state.write().await;
253 *state = ClientState::Connected;
254 }
255
256 let _io_state = client_state.clone();
258 let _io_local = local_state.clone();
259 let _io_config = config.clone();
260 let _io_server_tx = server_state_tx;
261
262 tokio::spawn(async move {
263 loop {
268 tokio::time::sleep(Duration::from_secs(1)).await;
269 }
270 });
271
272 let client = Self {
273 state: client_state,
274 local_state,
275 state_tx,
276 shutdown_tx: Some(shutdown_tx),
277 config,
278 };
279
280 let receiver = StateReceiver { rx: server_state_rx };
281
282 Ok((client, receiver))
283 }
284
285 pub async fn client_state(&self) -> ClientState {
287 *self.state.read().await
288 }
289
290 pub async fn local_state(&self) -> S {
292 self.local_state.read().await.clone()
293 }
294
295 pub async fn update_state(&self, new_state: S) -> Result<(), ClientError> {
299 {
301 let mut state = self.local_state.write().await;
302 *state = new_state.clone();
303 }
304
305 self.state_tx
307 .send(new_state)
308 .await
309 .map_err(|_| ClientError::Disconnected)
310 }
311
312 pub fn state_sender(&self) -> StateSender<S> {
316 StateSender {
317 tx: self.state_tx.clone(),
318 }
319 }
320
321 pub async fn is_connected(&self) -> bool {
323 matches!(*self.state.read().await, ClientState::Connected)
324 }
325
326 pub async fn disconnect(mut self) -> Result<(), ClientError> {
328 if let Some(tx) = self.shutdown_tx.take() {
330 let _ = tx.send(());
331 }
332
333 {
335 let mut state = self.state.write().await;
336 *state = ClientState::Closed;
337 }
338
339 Ok(())
340 }
341
342 pub fn server_addr(&self) -> SocketAddr {
344 self.config.server_addr
345 }
346}
347
348impl<S: SyncState> Drop for NomadClient<S> {
349 fn drop(&mut self) {
350 if let Some(tx) = self.shutdown_tx.take() {
352 let _ = tx.send(());
353 }
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 }