nomad_protocol/client/
client.rs

1//! High-level NOMAD client API.
2//!
3//! Provides `NomadClient<S>` for connecting to a NOMAD server and synchronizing
4//! state of type `S: SyncState`.
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8use std::time::Duration;
9
10use thiserror::Error;
11use tokio::sync::{mpsc, oneshot, RwLock};
12
13use crate::core::SyncState;
14
15/// Errors that can occur in the NOMAD client.
16#[derive(Debug, Error)]
17pub enum ClientError {
18    /// Failed to connect to server.
19    #[error("connection failed: {0}")]
20    ConnectionFailed(String),
21
22    /// Handshake failed.
23    #[error("handshake failed: {0}")]
24    HandshakeFailed(String),
25
26    /// Session terminated.
27    #[error("session terminated: {0}")]
28    SessionTerminated(String),
29
30    /// I/O error.
31    #[error("I/O error: {0}")]
32    Io(#[from] std::io::Error),
33
34    /// State synchronization error.
35    #[error("sync error: {0}")]
36    SyncError(String),
37
38    /// Client is disconnected.
39    #[error("client disconnected")]
40    Disconnected,
41
42    /// Operation timed out.
43    #[error("operation timed out")]
44    Timeout,
45}
46
47/// Client configuration.
48#[derive(Debug, Clone)]
49pub struct ClientConfig {
50    /// Server address to connect to.
51    pub server_addr: SocketAddr,
52
53    /// Server's static public key (32 bytes).
54    pub server_public_key: [u8; 32],
55
56    /// Client's static private key (optional, generated if not provided).
57    pub client_private_key: Option<[u8; 32]>,
58
59    /// Connection timeout.
60    pub connect_timeout: Duration,
61
62    /// Enable compression extension.
63    pub enable_compression: bool,
64}
65
66impl Default for ClientConfig {
67    fn default() -> Self {
68        Self {
69            server_addr: "127.0.0.1:19999".parse().unwrap(),
70            server_public_key: [0u8; 32],
71            client_private_key: None,
72            connect_timeout: Duration::from_secs(10),
73            enable_compression: true,
74        }
75    }
76}
77
78/// Builder for creating a `NomadClient`.
79#[derive(Debug)]
80pub struct NomadClientBuilder {
81    config: ClientConfig,
82}
83
84impl NomadClientBuilder {
85    /// Create a new client builder.
86    pub fn new() -> Self {
87        Self {
88            config: ClientConfig::default(),
89        }
90    }
91
92    /// Set the server address.
93    pub fn server_addr(mut self, addr: SocketAddr) -> Self {
94        self.config.server_addr = addr;
95        self
96    }
97
98    /// Set the server's public key.
99    pub fn server_public_key(mut self, key: [u8; 32]) -> Self {
100        self.config.server_public_key = key;
101        self
102    }
103
104    /// Set the client's private key.
105    pub fn client_private_key(mut self, key: [u8; 32]) -> Self {
106        self.config.client_private_key = Some(key);
107        self
108    }
109
110    /// Set the connection timeout.
111    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
112        self.config.connect_timeout = timeout;
113        self
114    }
115
116    /// Enable or disable compression.
117    pub fn compression(mut self, enabled: bool) -> Self {
118        self.config.enable_compression = enabled;
119        self
120    }
121
122    /// Build the client configuration.
123    pub fn build(self) -> ClientConfig {
124        self.config
125    }
126}
127
128impl Default for NomadClientBuilder {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134/// Internal client state.
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum ClientState {
137    /// Not connected.
138    Disconnected,
139    /// Handshake in progress.
140    Connecting,
141    /// Connected and syncing.
142    Connected,
143    /// Connection closed gracefully.
144    Closed,
145}
146
147/// Handle for sending state updates to the server.
148pub struct StateSender<S: SyncState> {
149    tx: mpsc::Sender<S>,
150}
151
152impl<S: SyncState> StateSender<S> {
153    /// Send a state update to the server.
154    ///
155    /// This is non-blocking; the update will be queued and sent
156    /// according to the pacing algorithm.
157    pub async fn send(&self, state: S) -> Result<(), ClientError> {
158        self.tx
159            .send(state)
160            .await
161            .map_err(|_| ClientError::Disconnected)
162    }
163}
164
165impl<S: SyncState> Clone for StateSender<S> {
166    fn clone(&self) -> Self {
167        Self {
168            tx: self.tx.clone(),
169        }
170    }
171}
172
173/// Handle for receiving state updates from the server.
174pub struct StateReceiver<S: SyncState> {
175    rx: mpsc::Receiver<S>,
176}
177
178impl<S: SyncState> StateReceiver<S> {
179    /// Receive the next state update from the server.
180    ///
181    /// Returns `None` if the connection is closed.
182    pub async fn recv(&mut self) -> Option<S> {
183        self.rx.recv().await
184    }
185}
186
187/// A NOMAD protocol client.
188///
189/// Generic over state type `S` which must implement `SyncState`.
190///
191/// # Example
192///
193/// ```ignore
194/// use nomad_protocol::client::{NomadClient, NomadClientBuilder};
195///
196/// let config = NomadClientBuilder::new()
197///     .server_addr("127.0.0.1:19999".parse()?)
198///     .server_public_key(server_pubkey)
199///     .build();
200///
201/// let (client, state_rx) = NomadClient::<MyState>::connect(config, initial_state).await?;
202///
203/// // Send state updates
204/// client.update_state(new_state).await?;
205///
206/// // Receive server state updates
207/// while let Some(server_state) = state_rx.recv().await {
208///     // Handle server state
209/// }
210/// ```
211pub struct NomadClient<S: SyncState> {
212    /// Current client state.
213    state: Arc<RwLock<ClientState>>,
214
215    /// Current local state.
216    local_state: Arc<RwLock<S>>,
217
218    /// Channel for sending state updates.
219    state_tx: mpsc::Sender<S>,
220
221    /// Shutdown signal.
222    shutdown_tx: Option<oneshot::Sender<()>>,
223
224    /// Client configuration.
225    config: ClientConfig,
226}
227
228impl<S: SyncState> NomadClient<S> {
229    /// Connect to a NOMAD server.
230    ///
231    /// Returns the client handle and a receiver for server state updates.
232    pub async fn connect(
233        config: ClientConfig,
234        initial_state: S,
235    ) -> Result<(Self, StateReceiver<S>), ClientError> {
236        // Create channels for state communication
237        let (state_tx, _state_rx) = mpsc::channel::<S>(32);
238        let (server_state_tx, server_state_rx) = mpsc::channel::<S>(32);
239        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
240
241        let client_state = Arc::new(RwLock::new(ClientState::Connecting));
242        let local_state = Arc::new(RwLock::new(initial_state));
243
244        // TODO: Spawn connection task that:
245        // 1. Creates UDP socket
246        // 2. Performs Noise_IK handshake (via crypto module)
247        // 3. Starts sync engine (via sync module)
248        // 4. Handles incoming/outgoing frames (via transport module)
249
250        // For now, we set state to Connected (will be replaced with actual handshake)
251        {
252            let mut state = client_state.write().await;
253            *state = ClientState::Connected;
254        }
255
256        // Spawn the background I/O task
257        let _io_state = client_state.clone();
258        let _io_local = local_state.clone();
259        let _io_config = config.clone();
260        let _io_server_tx = server_state_tx;
261
262        tokio::spawn(async move {
263            // TODO: Implement the actual I/O loop
264            // This will be implemented when lower layers are ready
265
266            // For now, this is a placeholder that keeps the client "alive"
267            loop {
268                tokio::time::sleep(Duration::from_secs(1)).await;
269            }
270        });
271
272        let client = Self {
273            state: client_state,
274            local_state,
275            state_tx,
276            shutdown_tx: Some(shutdown_tx),
277            config,
278        };
279
280        let receiver = StateReceiver { rx: server_state_rx };
281
282        Ok((client, receiver))
283    }
284
285    /// Get the current client state.
286    pub async fn client_state(&self) -> ClientState {
287        *self.state.read().await
288    }
289
290    /// Get a copy of the current local state.
291    pub async fn local_state(&self) -> S {
292        self.local_state.read().await.clone()
293    }
294
295    /// Update the local state.
296    ///
297    /// This queues the state for synchronization with the server.
298    pub async fn update_state(&self, new_state: S) -> Result<(), ClientError> {
299        // Update local state
300        {
301            let mut state = self.local_state.write().await;
302            *state = new_state.clone();
303        }
304
305        // Queue for sending
306        self.state_tx
307            .send(new_state)
308            .await
309            .map_err(|_| ClientError::Disconnected)
310    }
311
312    /// Get a sender handle for state updates.
313    ///
314    /// This can be cloned and used from multiple tasks.
315    pub fn state_sender(&self) -> StateSender<S> {
316        StateSender {
317            tx: self.state_tx.clone(),
318        }
319    }
320
321    /// Check if the client is connected.
322    pub async fn is_connected(&self) -> bool {
323        matches!(*self.state.read().await, ClientState::Connected)
324    }
325
326    /// Gracefully disconnect from the server.
327    pub async fn disconnect(mut self) -> Result<(), ClientError> {
328        // Send shutdown signal
329        if let Some(tx) = self.shutdown_tx.take() {
330            let _ = tx.send(());
331        }
332
333        // Update state
334        {
335            let mut state = self.state.write().await;
336            *state = ClientState::Closed;
337        }
338
339        Ok(())
340    }
341
342    /// Get the server address.
343    pub fn server_addr(&self) -> SocketAddr {
344        self.config.server_addr
345    }
346}
347
348impl<S: SyncState> Drop for NomadClient<S> {
349    fn drop(&mut self) {
350        // Send shutdown signal if not already sent
351        if let Some(tx) = self.shutdown_tx.take() {
352            let _ = tx.send(());
353        }
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    // TODO: Add tests once we have a concrete SyncState implementation
360}