Skip to main content

ap_proxy_client/
protocol_client.rs

1use ap_proxy_protocol::{
2    IdentityFingerprint, IdentityKeyPair, Messages, ProxyError, RendezvousCode,
3};
4use futures_util::{SinkExt, StreamExt};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{Mutex, mpsc};
8use tokio::task::JoinHandle;
9use tokio::time::Instant;
10use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::Message};
11
12/// Interval between client-side WebSocket pings.
13const PING_INTERVAL: Duration = Duration::from_secs(30);
14/// Maximum time without a pong before the connection is considered dead.
15const PONG_TIMEOUT: Duration = Duration::from_secs(60);
16
17use super::config::{ClientState, IncomingMessage, ProxyClientConfig};
18
19/// Convert tungstenite errors into ProxyError (replaces the From impl that
20/// was removed from ap-proxy-protocol to keep it free of tungstenite deps).
21fn ws_err(e: tokio_tungstenite::tungstenite::Error) -> ProxyError {
22    ProxyError::WebSocket(e.to_string())
23}
24
25type WsStream = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
26type WsSink = futures_util::stream::SplitSink<WsStream, Message>;
27type WsSource = futures_util::stream::SplitStream<WsStream>;
28
29/// Client for connecting to and communicating through a ap-proxy server.
30///
31/// This is the main client API for connecting to a proxy server, authenticating,
32/// discovering peers via rendezvous codes, and sending messages.
33///
34/// # Lifecycle
35///
36/// 1. Create client with [`new()`](ProxyProtocolClient::new)
37/// 2. Connect and authenticate with [`connect()`](ProxyProtocolClient::connect)
38/// 3. Perform operations (send messages, request rendezvous codes, etc.)
39/// 4. Disconnect with [`disconnect()`](ProxyProtocolClient::disconnect)
40///
41/// # Examples
42///
43/// Basic usage:
44///
45/// ```no_run
46/// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage};
47///
48/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
49/// // Create and connect
50/// let config = ProxyClientConfig {
51///     proxy_url: "ws://localhost:8080".to_string(),
52///     identity_keypair: None,
53/// };
54/// let mut client = ProxyProtocolClient::new(config);
55/// let mut incoming = client.connect().await?;
56///
57/// // Handle messages
58/// tokio::spawn(async move {
59///     while let Some(msg) = incoming.recv().await {
60///         match msg {
61///             IncomingMessage::Send { source, payload, .. } => {
62///                 println!("Got message from {:?}", source);
63///             }
64///             _ => {}
65///         }
66///     }
67/// });
68///
69/// // Send a message
70/// // client.send_to(target_fingerprint, b"Hello".to_vec()).await?;
71/// # Ok(())
72/// # }
73/// ```
74pub struct ProxyProtocolClient {
75    // Configuration
76    config: ProxyClientConfig,
77    identity: Arc<IdentityKeyPair>,
78
79    // Connection state
80    state: Arc<Mutex<ClientState>>,
81
82    // WebSocket components (None when disconnected)
83    outgoing_tx: Option<mpsc::UnboundedSender<Message>>,
84
85    // Task handles for cleanup
86    read_task_handle: Option<JoinHandle<()>>,
87    write_task_handle: Option<JoinHandle<()>>,
88}
89
90impl ProxyProtocolClient {
91    /// Create a new proxy client with the given configuration.
92    ///
93    /// This does not establish a connection - call [`connect()`](ProxyProtocolClient::connect)
94    /// to connect and authenticate.
95    ///
96    /// If `config.identity_keypair` is `None`, a new random identity will be generated.
97    /// Otherwise, the provided identity will be used for authentication.
98    ///
99    /// # Examples
100    ///
101    /// Create client with new identity:
102    ///
103    /// ```
104    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
105    ///
106    /// let config = ProxyClientConfig {
107    ///     proxy_url: "ws://localhost:8080".to_string(),
108    ///     identity_keypair: None, // Will generate new identity
109    /// };
110    /// let client = ProxyProtocolClient::new(config);
111    /// println!("Client fingerprint: {:?}", client.fingerprint());
112    /// ```
113    ///
114    /// Create client with existing identity:
115    ///
116    /// ```
117    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
118    ///
119    /// let keypair = IdentityKeyPair::generate();
120    /// let config = ProxyClientConfig {
121    ///     proxy_url: "ws://localhost:8080".to_string(),
122    ///     identity_keypair: Some(keypair),
123    /// };
124    /// let client = ProxyProtocolClient::new(config);
125    /// ```
126    pub fn new(mut config: ProxyClientConfig) -> Self {
127        let identity = Arc::new(
128            config
129                .identity_keypair
130                .take()
131                .unwrap_or_else(IdentityKeyPair::generate),
132        );
133
134        Self {
135            config,
136            identity,
137            state: Arc::new(Mutex::new(ClientState::Disconnected)),
138            outgoing_tx: None,
139            read_task_handle: None,
140            write_task_handle: None,
141        }
142    }
143
144    /// Connect to the proxy server and perform authentication.
145    ///
146    /// Establishes a WebSocket connection, completes the challenge-response authentication,
147    /// and returns a channel for receiving incoming messages.
148    ///
149    /// # Authentication Flow
150    ///
151    /// 1. Connect to WebSocket at the configured URL
152    /// 2. Receive authentication challenge from server
153    /// 3. Sign challenge with client's private key
154    /// 4. Send signed response to server
155    /// 5. Server verifies signature and accepts connection
156    ///
157    /// # Timeout
158    ///
159    /// Authentication must complete within 5 seconds or this method returns
160    /// [`ProxyError::AuthenticationTimeout`].
161    ///
162    /// # Errors
163    ///
164    /// - [`ProxyError::AlreadyConnected`] if already connected
165    /// - [`ProxyError::WebSocket`] if connection fails
166    /// - [`ProxyError::AuthenticationFailed`] if signature verification fails
167    /// - [`ProxyError::AuthenticationTimeout`] if authentication takes too long
168    ///
169    /// # Examples
170    ///
171    /// ```no_run
172    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage};
173    ///
174    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
175    /// let config = ProxyClientConfig {
176    ///     proxy_url: "ws://localhost:8080".to_string(),
177    ///     identity_keypair: None,
178    /// };
179    /// let mut client = ProxyProtocolClient::new(config);
180    ///
181    /// // Connect and get incoming message channel
182    /// let mut incoming = client.connect().await?;
183    ///
184    /// // Handle messages
185    /// while let Some(msg) = incoming.recv().await {
186    ///     println!("Received: {:?}", msg);
187    /// }
188    /// # Ok(())
189    /// # }
190    /// ```
191    pub async fn connect(
192        &mut self,
193    ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ProxyError> {
194        // Check not already connected
195        {
196            let state = self.state.lock().await;
197            if !matches!(*state, ClientState::Disconnected) {
198                return Err(ProxyError::AlreadyConnected);
199            }
200        }
201
202        // Connect WebSocket
203        let (ws_stream, _) = connect_async(&self.config.proxy_url)
204            .await
205            .map_err(ws_err)?;
206
207        // Split into read/write
208        let (ws_sink, ws_source) = ws_stream.split();
209
210        // Update state to Connected
211        *self.state.lock().await = ClientState::Connected;
212
213        // Create channels
214        let (outgoing_tx, outgoing_rx) = mpsc::unbounded_channel::<Message>();
215        let (incoming_tx, incoming_rx) = mpsc::unbounded_channel::<IncomingMessage>();
216        let (auth_tx, mut auth_rx) = mpsc::unbounded_channel::<Result<(), ProxyError>>();
217
218        // Shared pong tracker for keep-alive
219        let last_pong = Arc::new(Mutex::new(Instant::now()));
220
221        // Spawn write task
222        let write_handle = tokio::spawn(Self::write_task(
223            ws_sink,
224            outgoing_rx,
225            Arc::clone(&last_pong),
226        ));
227
228        // Spawn read task (handles auth + message routing)
229        let read_handle = tokio::spawn(Self::read_task(
230            ws_source,
231            outgoing_tx.clone(),
232            incoming_tx,
233            Arc::clone(&self.identity),
234            self.state.clone(),
235            auth_tx,
236            last_pong,
237        ));
238
239        // Wait for authentication to complete
240        match tokio::time::timeout(tokio::time::Duration::from_secs(5), auth_rx.recv()).await {
241            Ok(Some(Ok(()))) => {
242                // Authentication succeeded
243            }
244            Ok(Some(Err(e))) => {
245                // Authentication failed
246                self.read_task_handle = Some(read_handle);
247                self.write_task_handle = Some(write_handle);
248                self.disconnect().await?;
249                return Err(e);
250            }
251            Ok(None) | Err(_) => {
252                // Channel closed or timeout
253                self.read_task_handle = Some(read_handle);
254                self.write_task_handle = Some(write_handle);
255                self.disconnect().await?;
256                return Err(ProxyError::AuthenticationTimeout);
257            }
258        }
259
260        // Store handles and tx
261        self.outgoing_tx = Some(outgoing_tx);
262        self.read_task_handle = Some(read_handle);
263        self.write_task_handle = Some(write_handle);
264
265        Ok(incoming_rx)
266    }
267
268    /// Send a message to another authenticated client.
269    ///
270    /// The message is routed through the proxy server to the destination client.
271    /// The proxy validates the source identity but cannot inspect the payload.
272    ///
273    /// # Authentication Required
274    ///
275    /// This method requires an active authenticated connection. Call
276    /// [`connect()`](ProxyProtocolClient::connect) first.
277    ///
278    /// # Payload Encryption
279    ///
280    /// The proxy does not encrypt message payloads. Clients should implement
281    /// end-to-end encryption (e.g., using the Noise protocol) before calling this method.
282    ///
283    /// # Errors
284    ///
285    /// - [`ProxyError::NotConnected`] if not connected or not authenticated
286    /// - [`ProxyError::DestinationNotFound`] if the destination client is not connected
287    /// - [`ProxyError::Serialization`] if message encoding fails
288    ///
289    /// # Examples
290    ///
291    /// ```no_run
292    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
293    ///
294    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
295    /// # let config = ProxyClientConfig {
296    /// #     proxy_url: "ws://localhost:8080".to_string(),
297    /// #     identity_keypair: None,
298    /// # };
299    /// let mut client = ProxyProtocolClient::new(config);
300    /// client.connect().await?;
301    ///
302    /// // Get destination fingerprint from rendezvous lookup
303    /// // let destination = ...; // from IncomingMessage::IdentityInfo
304    ///
305    /// // Send encrypted message
306    /// // let payload = encrypt_message(b"Hello!")?;
307    /// // client.send_to(destination, payload).await?;
308    /// # Ok(())
309    /// # }
310    /// ```
311    pub async fn send_to(
312        &self,
313        destination: IdentityFingerprint,
314        payload: Vec<u8>,
315    ) -> Result<(), ProxyError> {
316        // Check authenticated
317        {
318            let state = self.state.lock().await;
319            if !matches!(*state, ClientState::Authenticated { .. }) {
320                return Err(ProxyError::NotConnected);
321            }
322        }
323
324        // Create Send message without source (server will add it)
325        let msg = Messages::Send {
326            source: None,
327            destination,
328            payload,
329        };
330
331        let json = serde_json::to_string(&msg)?;
332
333        // Send via outgoing_tx channel
334        if let Some(tx) = &self.outgoing_tx {
335            tx.send(Message::Text(json))
336                .map_err(|_| ProxyError::ChannelSendFailed)?;
337            Ok(())
338        } else {
339            Err(ProxyError::NotConnected)
340        }
341    }
342
343    /// Request a rendezvous code from the server.
344    ///
345    /// The server will generate a temporary code (format: "ABC-DEF-GHI") that maps to your
346    /// identity. The code will be delivered via [`IncomingMessage::RendezvousInfo`] on the
347    /// channel returned by [`connect()`](ProxyProtocolClient::connect).
348    ///
349    /// # Rendezvous Code Properties
350    ///
351    /// - Format: 9 alphanumeric characters (e.g., "ABC-DEF-GHI")
352    /// - Lifetime: 5 minutes
353    /// - Single-use: deleted after lookup
354    /// - Enables peer discovery without sharing long-lived identifiers
355    ///
356    /// # Usage Pattern
357    ///
358    /// 1. Call this method to request a code
359    /// 2. Receive the code via [`IncomingMessage::RendezvousInfo`]
360    /// 3. Share the code with a peer (e.g., display as QR code)
361    /// 4. Peer uses [`request_identity()`](ProxyProtocolClient::request_identity) to look up your identity
362    ///
363    /// # Authentication Required
364    ///
365    /// This method requires an active authenticated connection.
366    ///
367    /// # Errors
368    ///
369    /// - [`ProxyError::NotConnected`] if not connected or not authenticated
370    ///
371    /// # Examples
372    ///
373    /// ```no_run
374    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage};
375    ///
376    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
377    /// # let config = ProxyClientConfig {
378    /// #     proxy_url: "ws://localhost:8080".to_string(),
379    /// #     identity_keypair: None,
380    /// # };
381    /// let mut client = ProxyProtocolClient::new(config);
382    /// let mut incoming = client.connect().await?;
383    ///
384    /// // Request a code
385    /// client.request_rendezvous().await?;
386    ///
387    /// // Wait for response
388    /// if let Some(IncomingMessage::RendezvousInfo(code)) = incoming.recv().await {
389    ///     println!("Share this code: {}", code.as_str());
390    ///     // Display as QR code, send via messaging, etc.
391    /// }
392    /// # Ok(())
393    /// # }
394    /// ```
395    pub async fn request_rendezvous(&self) -> Result<(), ProxyError> {
396        // Check authenticated
397        {
398            let state = self.state.lock().await;
399            if !matches!(*state, ClientState::Authenticated { .. }) {
400                return Err(ProxyError::NotConnected);
401            }
402        }
403
404        // Send GetRendezvous message
405        let msg = Messages::GetRendezvous;
406        let json = serde_json::to_string(&msg)?;
407
408        // Send via outgoing_tx channel
409        if let Some(tx) = &self.outgoing_tx {
410            tx.send(Message::Text(json))
411                .map_err(|_| ProxyError::ChannelSendFailed)?;
412            Ok(())
413        } else {
414            Err(ProxyError::NotConnected)
415        }
416    }
417
418    /// Look up a peer's identity using a rendezvous code.
419    ///
420    /// Queries the server for the identity associated with a rendezvous code.
421    /// If the code is valid and hasn't expired, the server responds with
422    /// [`IncomingMessage::IdentityInfo`] containing the peer's identity and fingerprint.
423    ///
424    /// # Code Consumption
425    ///
426    /// Rendezvous codes are single-use. After successful lookup, the server deletes
427    /// the code and it cannot be used again.
428    ///
429    /// # Authentication Required
430    ///
431    /// This method requires an active authenticated connection.
432    ///
433    /// # Errors
434    ///
435    /// - [`ProxyError::NotConnected`] if not connected or not authenticated
436    ///
437    /// The server may not respond if the code is invalid, expired, or already used.
438    /// Implement a timeout when waiting for the response.
439    ///
440    /// # Examples
441    ///
442    /// ```no_run
443    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage, RendezvousCode};
444    ///
445    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
446    /// # let config = ProxyClientConfig {
447    /// #     proxy_url: "ws://localhost:8080".to_string(),
448    /// #     identity_keypair: None,
449    /// # };
450    /// let mut client = ProxyProtocolClient::new(config);
451    /// let mut incoming = client.connect().await?;
452    ///
453    /// // Get code from user (e.g., QR scan, text input)
454    /// let code = RendezvousCode::from_string("ABC-DEF-GHI".to_string());
455    ///
456    /// // Look up the identity
457    /// client.request_identity(code).await?;
458    ///
459    /// // Wait for response with timeout
460    /// match tokio::time::timeout(
461    ///     tokio::time::Duration::from_secs(5),
462    ///     incoming.recv()
463    /// ).await {
464    ///     Ok(Some(IncomingMessage::IdentityInfo { fingerprint, identity })) => {
465    ///         println!("Found peer: {:?}", fingerprint);
466    ///         // Now you can send messages to this peer
467    ///         // client.send_to(fingerprint, payload).await?;
468    ///     }
469    ///     _ => {
470    ///         println!("Code not found or expired");
471    ///     }
472    /// }
473    /// # Ok(())
474    /// # }
475    /// ```
476    pub async fn request_identity(
477        &self,
478        rendezvous_code: RendezvousCode,
479    ) -> Result<(), ProxyError> {
480        // Check authenticated
481        {
482            let state = self.state.lock().await;
483            if !matches!(*state, ClientState::Authenticated { .. }) {
484                return Err(ProxyError::NotConnected);
485            }
486        }
487
488        // Send GetIdentity message
489        let msg = Messages::GetIdentity(rendezvous_code);
490        let json = serde_json::to_string(&msg)?;
491
492        // Send via outgoing_tx channel
493        if let Some(tx) = &self.outgoing_tx {
494            tx.send(Message::Text(json))
495                .map_err(|_| ProxyError::ChannelSendFailed)?;
496            Ok(())
497        } else {
498            Err(ProxyError::NotConnected)
499        }
500    }
501
502    /// Get this client's identity fingerprint.
503    ///
504    /// Returns the SHA256 fingerprint of the client's public key. This is the
505    /// identifier that other clients use to send messages to this client.
506    ///
507    /// The fingerprint is available immediately after creating the client, before
508    /// connecting.
509    ///
510    /// # Examples
511    ///
512    /// ```
513    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
514    ///
515    /// let config = ProxyClientConfig {
516    ///     proxy_url: "ws://localhost:8080".to_string(),
517    ///     identity_keypair: None,
518    /// };
519    /// let client = ProxyProtocolClient::new(config);
520    /// println!("My fingerprint: {:?}", client.fingerprint());
521    /// ```
522    pub fn fingerprint(&self) -> IdentityFingerprint {
523        self.identity.identity().fingerprint()
524    }
525
526    /// Check if the client is currently authenticated.
527    ///
528    /// Returns `true` if the client has completed authentication and can send/receive
529    /// messages. Returns `false` if disconnected or still connecting.
530    ///
531    /// # Examples
532    ///
533    /// ```no_run
534    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
535    ///
536    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
537    /// # let config = ProxyClientConfig {
538    /// #     proxy_url: "ws://localhost:8080".to_string(),
539    /// #     identity_keypair: None,
540    /// # };
541    /// let mut client = ProxyProtocolClient::new(config);
542    ///
543    /// assert!(!client.is_authenticated().await);
544    ///
545    /// client.connect().await?;
546    /// assert!(client.is_authenticated().await);
547    ///
548    /// client.disconnect().await?;
549    /// assert!(!client.is_authenticated().await);
550    /// # Ok(())
551    /// # }
552    /// ```
553    pub async fn is_authenticated(&self) -> bool {
554        matches!(*self.state.lock().await, ClientState::Authenticated { .. })
555    }
556
557    /// Disconnect from the proxy server and clean up resources.
558    ///
559    /// Aborts background tasks, closes the WebSocket connection, and resets state.
560    /// After disconnecting, you can call [`connect()`](ProxyProtocolClient::connect)
561    /// again to reconnect.
562    ///
563    /// This method is automatically called when the client is dropped, but calling it
564    /// explicitly allows you to handle errors.
565    ///
566    /// # Examples
567    ///
568    /// ```no_run
569    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
570    ///
571    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
572    /// # let config = ProxyClientConfig {
573    /// #     proxy_url: "ws://localhost:8080".to_string(),
574    /// #     identity_keypair: None,
575    /// # };
576    /// let mut client = ProxyProtocolClient::new(config);
577    /// client.connect().await?;
578    ///
579    /// // Do work...
580    ///
581    /// // Clean disconnect
582    /// client.disconnect().await?;
583    /// # Ok(())
584    /// # }
585    /// ```
586    pub async fn disconnect(&mut self) -> Result<(), ProxyError> {
587        // Abort tasks
588        if let Some(handle) = self.read_task_handle.take() {
589            handle.abort();
590        }
591        if let Some(handle) = self.write_task_handle.take() {
592            handle.abort();
593        }
594
595        // Clear state
596        *self.state.lock().await = ClientState::Disconnected;
597
598        // Close channels
599        self.outgoing_tx = None;
600
601        Ok(())
602    }
603
604    /// Write task: sends messages from channel to WebSocket, with keep-alive pings
605    async fn write_task(
606        mut ws_sink: WsSink,
607        mut outgoing_rx: mpsc::UnboundedReceiver<Message>,
608        last_pong: Arc<Mutex<Instant>>,
609    ) {
610        let mut ping_interval = tokio::time::interval(PING_INTERVAL);
611        ping_interval.tick().await; // consume the immediate first tick
612
613        loop {
614            tokio::select! {
615                msg = outgoing_rx.recv() => {
616                    match msg {
617                        Some(msg) => {
618                            if ws_sink.send(msg).await.is_err() {
619                                break;
620                            }
621                        }
622                        None => break,
623                    }
624                }
625                _ = ping_interval.tick() => {
626                    // Check if we received a pong recently
627                    let elapsed = last_pong.lock().await.elapsed();
628                    if elapsed > PONG_TIMEOUT {
629                        tracing::warn!("No pong received in {:?}, closing connection", elapsed);
630                        break;
631                    }
632                    if ws_sink.send(Message::Ping(vec![])).await.is_err() {
633                        break;
634                    }
635                }
636            }
637        }
638    }
639
640    /// Read task: handles authentication and routes messages
641    async fn read_task(
642        mut ws_source: WsSource,
643        outgoing_tx: mpsc::UnboundedSender<Message>,
644        incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
645        identity: Arc<IdentityKeyPair>,
646        state: Arc<Mutex<ClientState>>,
647        auth_tx: mpsc::UnboundedSender<Result<(), ProxyError>>,
648        last_pong: Arc<Mutex<Instant>>,
649    ) {
650        // Handle authentication
651        match Self::handle_authentication(&mut ws_source, &outgoing_tx, &identity).await {
652            Ok(fingerprint) => {
653                *state.lock().await = ClientState::Authenticated { fingerprint };
654                // Notify that authentication succeeded
655                let _ = auth_tx.send(Ok(()));
656            }
657            Err(e) => {
658                tracing::error!("Authentication failed: {}", e);
659                *state.lock().await = ClientState::Disconnected;
660                // Notify that authentication failed
661                let _ = auth_tx.send(Err(e));
662                return;
663            }
664        }
665
666        // Enter message loop
667        if let Err(e) = Self::message_loop(ws_source, incoming_tx, last_pong).await {
668            tracing::error!("Message loop error: {}", e);
669        }
670
671        *state.lock().await = ClientState::Disconnected;
672    }
673
674    /// Handle authentication challenge-response
675    async fn handle_authentication(
676        ws_source: &mut WsSource,
677        outgoing_tx: &mpsc::UnboundedSender<Message>,
678        identity: &Arc<IdentityKeyPair>,
679    ) -> Result<IdentityFingerprint, ProxyError> {
680        // Receive AuthChallenge
681        let challenge_msg = ws_source
682            .next()
683            .await
684            .ok_or(ProxyError::ConnectionClosed)?
685            .map_err(ws_err)?;
686
687        let challenge = match challenge_msg {
688            Message::Text(text) => match serde_json::from_str::<Messages>(&text)? {
689                Messages::AuthChallenge(c) => c,
690                _ => return Err(ProxyError::InvalidMessage("Expected AuthChallenge".into())),
691            },
692            _ => return Err(ProxyError::InvalidMessage("Expected text message".into())),
693        };
694
695        // Sign challenge
696        let response = challenge.sign(identity);
697        let auth_response = Messages::AuthResponse(identity.identity(), response);
698        let auth_json = serde_json::to_string(&auth_response)?;
699
700        // Send auth response
701        outgoing_tx
702            .send(Message::Text(auth_json))
703            .map_err(|_| ProxyError::ChannelSendFailed)?;
704
705        // Authentication complete - server doesn't send confirmation
706        Ok(identity.identity().fingerprint())
707    }
708
709    /// Message loop: routes incoming messages to channel
710    async fn message_loop(
711        mut ws_source: WsSource,
712        incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
713        last_pong: Arc<Mutex<Instant>>,
714    ) -> Result<(), ProxyError> {
715        while let Some(msg_result) = ws_source.next().await {
716            let msg = msg_result.map_err(ws_err)?;
717
718            match msg {
719                Message::Text(text) => {
720                    let parsed: Messages = serde_json::from_str(&text)?;
721                    match parsed {
722                        Messages::Send {
723                            source,
724                            destination,
725                            payload,
726                        } => {
727                            // Server always includes source when forwarding messages
728                            if let Some(source) = source {
729                                incoming_tx
730                                    .send(IncomingMessage::Send {
731                                        source,
732                                        destination,
733                                        payload,
734                                    })
735                                    .ok();
736                            } else {
737                                tracing::warn!("Received Send message without source");
738                            }
739                        }
740                        Messages::RendezvousInfo(code) => {
741                            incoming_tx.send(IncomingMessage::RendezvousInfo(code)).ok();
742                        }
743                        Messages::IdentityInfo {
744                            fingerprint,
745                            identity,
746                        } => {
747                            incoming_tx
748                                .send(IncomingMessage::IdentityInfo {
749                                    fingerprint,
750                                    identity,
751                                })
752                                .ok();
753                        }
754                        Messages::GetIdentity(_) => {
755                            tracing::warn!("Received GetIdentity (client should not receive this)");
756                        }
757                        _ => tracing::warn!("Unexpected message type: {:?}", parsed),
758                    }
759                }
760                Message::Pong(_) => {
761                    *last_pong.lock().await = Instant::now();
762                }
763                Message::Close(_) => break,
764                _ => {}
765            }
766        }
767
768        Ok(())
769    }
770}