Skip to main content

ap_proxy_client/
protocol_client.rs

1use ap_proxy_protocol::{
2    IdentityFingerprint, IdentityKeyPair, Messages, ProxyError, RendezvousCode,
3};
4use futures_util::{SinkExt, StreamExt};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{Mutex, mpsc};
8use tokio::task::JoinHandle;
9use tokio::time::Instant;
10use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::Message};
11
12/// Interval between client-side WebSocket pings.
13const PING_INTERVAL: Duration = Duration::from_secs(30);
14/// Maximum time without a pong before the connection is considered dead.
15const PONG_TIMEOUT: Duration = Duration::from_secs(60);
16
17use super::config::{ClientState, IncomingMessage, ProxyClientConfig};
18
19/// Convert tungstenite errors into ProxyError (replaces the From impl that
20/// was removed from ap-proxy-protocol to keep it free of tungstenite deps).
21fn ws_err(e: tokio_tungstenite::tungstenite::Error) -> ProxyError {
22    ProxyError::WebSocket(e.to_string())
23}
24
25type WsStream = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
26type WsSink = futures_util::stream::SplitSink<WsStream, Message>;
27type WsSource = futures_util::stream::SplitStream<WsStream>;
28
29/// Client for connecting to and communicating through a ap-proxy server.
30///
31/// This is the main client API for connecting to a proxy server, authenticating,
32/// discovering peers via rendezvous codes, and sending messages.
33///
34/// # Lifecycle
35///
36/// 1. Create client with [`new()`](ProxyProtocolClient::new)
37/// 2. Connect and authenticate with [`connect()`](ProxyProtocolClient::connect)
38/// 3. Perform operations (send messages, request rendezvous codes, etc.)
39/// 4. Disconnect with [`disconnect()`](ProxyProtocolClient::disconnect)
40///
41/// # Examples
42///
43/// Basic usage:
44///
45/// ```no_run
46/// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage, IdentityKeyPair};
47///
48/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
49/// // Create and connect
50/// let config = ProxyClientConfig {
51///     proxy_url: "ws://localhost:8080".to_string(),
52/// };
53/// let mut client = ProxyProtocolClient::new(config);
54/// let mut incoming = client.connect(IdentityKeyPair::generate()).await?;
55///
56/// // Handle messages
57/// tokio::spawn(async move {
58///     while let Some(msg) = incoming.recv().await {
59///         match msg {
60///             IncomingMessage::Send { source, payload, .. } => {
61///                 println!("Got message from {:?}", source);
62///             }
63///             _ => {}
64///         }
65///     }
66/// });
67///
68/// // Send a message
69/// // client.send_to(target_fingerprint, b"Hello".to_vec()).await?;
70/// # Ok(())
71/// # }
72/// ```
73pub struct ProxyProtocolClient {
74    // Configuration
75    config: ProxyClientConfig,
76    identity: Option<Arc<IdentityKeyPair>>,
77
78    // Connection state
79    state: Arc<Mutex<ClientState>>,
80
81    // WebSocket components (None when disconnected)
82    outgoing_tx: Option<mpsc::UnboundedSender<Message>>,
83
84    // Task handles for cleanup
85    read_task_handle: Option<JoinHandle<()>>,
86    write_task_handle: Option<JoinHandle<()>>,
87}
88
89impl ProxyProtocolClient {
90    /// Create a new proxy client with the given configuration.
91    ///
92    /// This does not establish a connection - call [`connect()`](ProxyProtocolClient::connect)
93    /// to connect and authenticate with an identity.
94    ///
95    /// # Examples
96    ///
97    /// ```
98    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
99    ///
100    /// let config = ProxyClientConfig {
101    ///     proxy_url: "ws://localhost:8080".to_string(),
102    /// };
103    /// let client = ProxyProtocolClient::new(config);
104    /// ```
105    pub fn new(config: ProxyClientConfig) -> Self {
106        Self {
107            config,
108            identity: None,
109            state: Arc::new(Mutex::new(ClientState::Disconnected)),
110            outgoing_tx: None,
111            read_task_handle: None,
112            write_task_handle: None,
113        }
114    }
115
116    /// Create a new proxy client from just a URL.
117    ///
118    /// Convenience constructor equivalent to `new(ProxyClientConfig { proxy_url })`.
119    pub fn from_url(proxy_url: String) -> Self {
120        Self::new(ProxyClientConfig { proxy_url })
121    }
122
123    /// Connect to the proxy server and perform authentication.
124    ///
125    /// Establishes a WebSocket connection, completes the challenge-response authentication
126    /// using the provided identity, and returns a channel for receiving incoming messages.
127    ///
128    /// # Authentication Flow
129    ///
130    /// 1. Connect to WebSocket at the configured URL
131    /// 2. Receive authentication challenge from server
132    /// 3. Sign challenge with the provided identity's private key
133    /// 4. Send signed response to server
134    /// 5. Server verifies signature and accepts connection
135    ///
136    /// # Timeout
137    ///
138    /// Authentication must complete within 5 seconds or this method returns
139    /// [`ProxyError::AuthenticationTimeout`].
140    ///
141    /// # Errors
142    ///
143    /// - [`ProxyError::AlreadyConnected`] if already connected
144    /// - [`ProxyError::WebSocket`] if connection fails
145    /// - [`ProxyError::AuthenticationFailed`] if signature verification fails
146    /// - [`ProxyError::AuthenticationTimeout`] if authentication takes too long
147    ///
148    /// # Examples
149    ///
150    /// ```no_run
151    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage, IdentityKeyPair};
152    ///
153    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
154    /// let config = ProxyClientConfig {
155    ///     proxy_url: "ws://localhost:8080".to_string(),
156    /// };
157    /// let mut client = ProxyProtocolClient::new(config);
158    ///
159    /// // Connect and get incoming message channel
160    /// let mut incoming = client.connect(IdentityKeyPair::generate()).await?;
161    ///
162    /// // Handle messages
163    /// while let Some(msg) = incoming.recv().await {
164    ///     println!("Received: {:?}", msg);
165    /// }
166    /// # Ok(())
167    /// # }
168    /// ```
169    pub async fn connect(
170        &mut self,
171        identity: IdentityKeyPair,
172    ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ProxyError> {
173        // Check not already connected
174        {
175            let state = self.state.lock().await;
176            if !matches!(*state, ClientState::Disconnected) {
177                return Err(ProxyError::AlreadyConnected);
178            }
179        }
180
181        // Store the identity
182        let identity = Arc::new(identity);
183        self.identity = Some(Arc::clone(&identity));
184
185        // Connect WebSocket
186        let (ws_stream, _) = connect_async(&self.config.proxy_url)
187            .await
188            .map_err(ws_err)?;
189
190        // Split into read/write
191        let (ws_sink, ws_source) = ws_stream.split();
192
193        // Update state to Connected
194        *self.state.lock().await = ClientState::Connected;
195
196        // Create channels
197        let (outgoing_tx, outgoing_rx) = mpsc::unbounded_channel::<Message>();
198        let (incoming_tx, incoming_rx) = mpsc::unbounded_channel::<IncomingMessage>();
199        let (auth_tx, mut auth_rx) = mpsc::unbounded_channel::<Result<(), ProxyError>>();
200
201        // Shared pong tracker for keep-alive
202        let last_pong = Arc::new(Mutex::new(Instant::now()));
203
204        // Spawn write task
205        let write_handle = tokio::spawn(Self::write_task(
206            ws_sink,
207            outgoing_rx,
208            Arc::clone(&last_pong),
209        ));
210
211        // Spawn read task (handles auth + message routing)
212        let read_handle = tokio::spawn(Self::read_task(
213            ws_source,
214            outgoing_tx.clone(),
215            incoming_tx,
216            identity,
217            self.state.clone(),
218            auth_tx,
219            last_pong,
220        ));
221
222        // Wait for authentication to complete
223        match tokio::time::timeout(tokio::time::Duration::from_secs(5), auth_rx.recv()).await {
224            Ok(Some(Ok(()))) => {
225                // Authentication succeeded
226            }
227            Ok(Some(Err(e))) => {
228                // Authentication failed
229                self.read_task_handle = Some(read_handle);
230                self.write_task_handle = Some(write_handle);
231                self.disconnect().await?;
232                return Err(e);
233            }
234            Ok(None) | Err(_) => {
235                // Channel closed or timeout
236                self.read_task_handle = Some(read_handle);
237                self.write_task_handle = Some(write_handle);
238                self.disconnect().await?;
239                return Err(ProxyError::AuthenticationTimeout);
240            }
241        }
242
243        // Store handles and tx
244        self.outgoing_tx = Some(outgoing_tx);
245        self.read_task_handle = Some(read_handle);
246        self.write_task_handle = Some(write_handle);
247
248        Ok(incoming_rx)
249    }
250
251    /// Send a message to another authenticated client.
252    ///
253    /// The message is routed through the proxy server to the destination client.
254    /// The proxy validates the source identity but cannot inspect the payload.
255    ///
256    /// # Authentication Required
257    ///
258    /// This method requires an active authenticated connection. Call
259    /// [`connect()`](ProxyProtocolClient::connect) first.
260    ///
261    /// # Payload Encryption
262    ///
263    /// The proxy does not encrypt message payloads. Clients should implement
264    /// end-to-end encryption (e.g., using the Noise protocol) before calling this method.
265    ///
266    /// # Errors
267    ///
268    /// - [`ProxyError::NotConnected`] if not connected or not authenticated
269    /// - [`ProxyError::DestinationNotFound`] if the destination client is not connected
270    /// - [`ProxyError::Serialization`] if message encoding fails
271    ///
272    /// # Examples
273    ///
274    /// ```no_run
275    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
276    ///
277    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
278    /// # let config = ProxyClientConfig {
279    /// #     proxy_url: "ws://localhost:8080".to_string(),
280    /// # };
281    /// let mut client = ProxyProtocolClient::new(config);
282    /// client.connect(IdentityKeyPair::generate()).await?;
283    ///
284    /// // Get destination fingerprint from rendezvous lookup
285    /// // let destination = ...; // from IncomingMessage::IdentityInfo
286    ///
287    /// // Send encrypted message
288    /// // let payload = encrypt_message(b"Hello!")?;
289    /// // client.send_to(destination, payload).await?;
290    /// # Ok(())
291    /// # }
292    /// ```
293    pub async fn send_to(
294        &self,
295        destination: IdentityFingerprint,
296        payload: Vec<u8>,
297    ) -> Result<(), ProxyError> {
298        // Check authenticated
299        {
300            let state = self.state.lock().await;
301            if !matches!(*state, ClientState::Authenticated { .. }) {
302                return Err(ProxyError::NotConnected);
303            }
304        }
305
306        // Create Send message without source (server will add it)
307        let msg = Messages::Send {
308            source: None,
309            destination,
310            payload,
311        };
312
313        let json = serde_json::to_string(&msg)?;
314
315        // Send via outgoing_tx channel
316        if let Some(tx) = &self.outgoing_tx {
317            tx.send(Message::Text(json))
318                .map_err(|_| ProxyError::ChannelSendFailed)?;
319            Ok(())
320        } else {
321            Err(ProxyError::NotConnected)
322        }
323    }
324
325    /// Request a rendezvous code from the server.
326    ///
327    /// The server will generate a temporary code (format: "ABC-DEF-GHI") that maps to your
328    /// identity. The code will be delivered via [`IncomingMessage::RendezvousInfo`] on the
329    /// channel returned by [`connect()`](ProxyProtocolClient::connect).
330    ///
331    /// # Rendezvous Code Properties
332    ///
333    /// - Format: 9 alphanumeric characters (e.g., "ABC-DEF-GHI")
334    /// - Lifetime: 5 minutes
335    /// - Single-use: deleted after lookup
336    /// - Enables peer discovery without sharing long-lived identifiers
337    ///
338    /// # Usage Pattern
339    ///
340    /// 1. Call this method to request a code
341    /// 2. Receive the code via [`IncomingMessage::RendezvousInfo`]
342    /// 3. Share the code with a peer (e.g., display as QR code)
343    /// 4. Peer uses [`request_identity()`](ProxyProtocolClient::request_identity) to look up your identity
344    ///
345    /// # Authentication Required
346    ///
347    /// This method requires an active authenticated connection.
348    ///
349    /// # Errors
350    ///
351    /// - [`ProxyError::NotConnected`] if not connected or not authenticated
352    ///
353    /// # Examples
354    ///
355    /// ```no_run
356    /// use ap_proxy_client::{ProxyProtocolClient, IncomingMessage, IdentityKeyPair};
357    ///
358    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
359    /// let mut client = ProxyProtocolClient::from_url("ws://localhost:8080".to_string());
360    /// let mut incoming = client.connect(IdentityKeyPair::generate()).await?;
361    ///
362    /// // Request a code
363    /// client.request_rendezvous().await?;
364    ///
365    /// // Wait for response
366    /// if let Some(IncomingMessage::RendezvousInfo(code)) = incoming.recv().await {
367    ///     println!("Share this code: {}", code.as_str());
368    ///     // Display as QR code, send via messaging, etc.
369    /// }
370    /// # Ok(())
371    /// # }
372    /// ```
373    pub async fn request_rendezvous(&self) -> Result<(), ProxyError> {
374        // Check authenticated
375        {
376            let state = self.state.lock().await;
377            if !matches!(*state, ClientState::Authenticated { .. }) {
378                return Err(ProxyError::NotConnected);
379            }
380        }
381
382        // Send GetRendezvous message
383        let msg = Messages::GetRendezvous;
384        let json = serde_json::to_string(&msg)?;
385
386        // Send via outgoing_tx channel
387        if let Some(tx) = &self.outgoing_tx {
388            tx.send(Message::Text(json))
389                .map_err(|_| ProxyError::ChannelSendFailed)?;
390            Ok(())
391        } else {
392            Err(ProxyError::NotConnected)
393        }
394    }
395
396    /// Look up a peer's identity using a rendezvous code.
397    ///
398    /// Queries the server for the identity associated with a rendezvous code.
399    /// If the code is valid and hasn't expired, the server responds with
400    /// [`IncomingMessage::IdentityInfo`] containing the peer's identity and fingerprint.
401    ///
402    /// # Code Consumption
403    ///
404    /// Rendezvous codes are single-use. After successful lookup, the server deletes
405    /// the code and it cannot be used again.
406    ///
407    /// # Authentication Required
408    ///
409    /// This method requires an active authenticated connection.
410    ///
411    /// # Errors
412    ///
413    /// - [`ProxyError::NotConnected`] if not connected or not authenticated
414    ///
415    /// The server may not respond if the code is invalid, expired, or already used.
416    /// Implement a timeout when waiting for the response.
417    ///
418    /// # Examples
419    ///
420    /// ```no_run
421    /// use ap_proxy_client::{ProxyProtocolClient, IncomingMessage, IdentityKeyPair, RendezvousCode};
422    ///
423    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
424    /// let mut client = ProxyProtocolClient::from_url("ws://localhost:8080".to_string());
425    /// let mut incoming = client.connect(IdentityKeyPair::generate()).await?;
426    ///
427    /// // Get code from user (e.g., QR scan, text input)
428    /// let code = RendezvousCode::from_string("ABC-DEF-GHI".to_string());
429    ///
430    /// // Look up the identity
431    /// client.request_identity(code).await?;
432    ///
433    /// // Wait for response with timeout
434    /// match tokio::time::timeout(
435    ///     tokio::time::Duration::from_secs(5),
436    ///     incoming.recv()
437    /// ).await {
438    ///     Ok(Some(IncomingMessage::IdentityInfo { fingerprint, identity })) => {
439    ///         println!("Found peer: {:?}", fingerprint);
440    ///         // Now you can send messages to this peer
441    ///         // client.send_to(fingerprint, payload).await?;
442    ///     }
443    ///     _ => {
444    ///         println!("Code not found or expired");
445    ///     }
446    /// }
447    /// # Ok(())
448    /// # }
449    /// ```
450    pub async fn request_identity(
451        &self,
452        rendezvous_code: RendezvousCode,
453    ) -> Result<(), ProxyError> {
454        // Check authenticated
455        {
456            let state = self.state.lock().await;
457            if !matches!(*state, ClientState::Authenticated { .. }) {
458                return Err(ProxyError::NotConnected);
459            }
460        }
461
462        // Send GetIdentity message
463        let msg = Messages::GetIdentity(rendezvous_code);
464        let json = serde_json::to_string(&msg)?;
465
466        // Send via outgoing_tx channel
467        if let Some(tx) = &self.outgoing_tx {
468            tx.send(Message::Text(json))
469                .map_err(|_| ProxyError::ChannelSendFailed)?;
470            Ok(())
471        } else {
472            Err(ProxyError::NotConnected)
473        }
474    }
475
476    /// Get this client's identity fingerprint.
477    ///
478    /// Returns the SHA256 fingerprint of the client's public key. This is the
479    /// identifier that other clients use to send messages to this client.
480    ///
481    /// The fingerprint is available immediately after creating the client, before
482    /// connecting.
483    ///
484    /// # Examples
485    ///
486    /// ```no_run
487    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
488    ///
489    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
490    /// let config = ProxyClientConfig {
491    ///     proxy_url: "ws://localhost:8080".to_string(),
492    /// };
493    /// let mut client = ProxyProtocolClient::new(config);
494    /// client.connect(IdentityKeyPair::generate()).await?;
495    /// println!("My fingerprint: {:?}", client.fingerprint());
496    /// # Ok(())
497    /// # }
498    /// ```
499    pub fn fingerprint(&self) -> IdentityFingerprint {
500        self.identity
501            .as_ref()
502            .expect("fingerprint() requires a prior connect()")
503            .identity()
504            .fingerprint()
505    }
506
507    /// Check if the client is currently authenticated.
508    ///
509    /// Returns `true` if the client has completed authentication and can send/receive
510    /// messages. Returns `false` if disconnected or still connecting.
511    ///
512    /// # Examples
513    ///
514    /// ```no_run
515    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
516    ///
517    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
518    /// let config = ProxyClientConfig {
519    ///     proxy_url: "ws://localhost:8080".to_string(),
520    /// };
521    /// let mut client = ProxyProtocolClient::new(config);
522    ///
523    /// assert!(!client.is_authenticated().await);
524    ///
525    /// client.connect(IdentityKeyPair::generate()).await?;
526    /// assert!(client.is_authenticated().await);
527    ///
528    /// client.disconnect().await?;
529    /// assert!(!client.is_authenticated().await);
530    /// # Ok(())
531    /// # }
532    /// ```
533    pub async fn is_authenticated(&self) -> bool {
534        matches!(*self.state.lock().await, ClientState::Authenticated { .. })
535    }
536
537    /// Disconnect from the proxy server and clean up resources.
538    ///
539    /// Aborts background tasks, closes the WebSocket connection, and resets state.
540    /// After disconnecting, you can call [`connect()`](ProxyProtocolClient::connect)
541    /// again to reconnect.
542    ///
543    /// This method is automatically called when the client is dropped, but calling it
544    /// explicitly allows you to handle errors.
545    ///
546    /// # Examples
547    ///
548    /// ```no_run
549    /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
550    ///
551    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
552    /// # let config = ProxyClientConfig {
553    /// #     proxy_url: "ws://localhost:8080".to_string(),
554    /// # };
555    /// let mut client = ProxyProtocolClient::new(config);
556    /// client.connect(IdentityKeyPair::generate()).await?;
557    ///
558    /// // Do work...
559    ///
560    /// // Clean disconnect
561    /// client.disconnect().await?;
562    /// # Ok(())
563    /// # }
564    /// ```
565    pub async fn disconnect(&mut self) -> Result<(), ProxyError> {
566        // Abort tasks
567        if let Some(handle) = self.read_task_handle.take() {
568            handle.abort();
569        }
570        if let Some(handle) = self.write_task_handle.take() {
571            handle.abort();
572        }
573
574        // Clear state
575        *self.state.lock().await = ClientState::Disconnected;
576
577        // Close channels
578        self.outgoing_tx = None;
579
580        Ok(())
581    }
582
583    /// Write task: sends messages from channel to WebSocket, with keep-alive pings
584    async fn write_task(
585        mut ws_sink: WsSink,
586        mut outgoing_rx: mpsc::UnboundedReceiver<Message>,
587        last_pong: Arc<Mutex<Instant>>,
588    ) {
589        let mut ping_interval = tokio::time::interval(PING_INTERVAL);
590        ping_interval.tick().await; // consume the immediate first tick
591
592        loop {
593            tokio::select! {
594                msg = outgoing_rx.recv() => {
595                    match msg {
596                        Some(msg) => {
597                            if ws_sink.send(msg).await.is_err() {
598                                break;
599                            }
600                        }
601                        None => break,
602                    }
603                }
604                _ = ping_interval.tick() => {
605                    // Check if we received a pong recently
606                    let elapsed = last_pong.lock().await.elapsed();
607                    if elapsed > PONG_TIMEOUT {
608                        tracing::warn!("No pong received in {:?}, closing connection", elapsed);
609                        break;
610                    }
611                    if ws_sink.send(Message::Ping(vec![])).await.is_err() {
612                        break;
613                    }
614                }
615            }
616        }
617    }
618
619    /// Read task: handles authentication and routes messages
620    async fn read_task(
621        mut ws_source: WsSource,
622        outgoing_tx: mpsc::UnboundedSender<Message>,
623        incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
624        identity: Arc<IdentityKeyPair>,
625        state: Arc<Mutex<ClientState>>,
626        auth_tx: mpsc::UnboundedSender<Result<(), ProxyError>>,
627        last_pong: Arc<Mutex<Instant>>,
628    ) {
629        // Handle authentication
630        match Self::handle_authentication(&mut ws_source, &outgoing_tx, &identity).await {
631            Ok(fingerprint) => {
632                *state.lock().await = ClientState::Authenticated { fingerprint };
633                // Notify that authentication succeeded
634                let _ = auth_tx.send(Ok(()));
635            }
636            Err(e) => {
637                tracing::error!("Authentication failed: {}", e);
638                *state.lock().await = ClientState::Disconnected;
639                // Notify that authentication failed
640                let _ = auth_tx.send(Err(e));
641                return;
642            }
643        }
644
645        // Enter message loop
646        if let Err(e) = Self::message_loop(ws_source, incoming_tx, last_pong).await {
647            tracing::error!("Message loop error: {}", e);
648        }
649
650        *state.lock().await = ClientState::Disconnected;
651    }
652
653    /// Handle authentication challenge-response
654    async fn handle_authentication(
655        ws_source: &mut WsSource,
656        outgoing_tx: &mpsc::UnboundedSender<Message>,
657        identity: &Arc<IdentityKeyPair>,
658    ) -> Result<IdentityFingerprint, ProxyError> {
659        // Receive AuthChallenge
660        let challenge_msg = ws_source
661            .next()
662            .await
663            .ok_or(ProxyError::ConnectionClosed)?
664            .map_err(ws_err)?;
665
666        let challenge = match challenge_msg {
667            Message::Text(text) => match serde_json::from_str::<Messages>(&text)? {
668                Messages::AuthChallenge(c) => c,
669                _ => return Err(ProxyError::InvalidMessage("Expected AuthChallenge".into())),
670            },
671            _ => return Err(ProxyError::InvalidMessage("Expected text message".into())),
672        };
673
674        // Sign challenge
675        let response = challenge.sign(identity);
676        let auth_response = Messages::AuthResponse(identity.identity(), response);
677        let auth_json = serde_json::to_string(&auth_response)?;
678
679        // Send auth response
680        outgoing_tx
681            .send(Message::Text(auth_json))
682            .map_err(|_| ProxyError::ChannelSendFailed)?;
683
684        // Authentication complete - server doesn't send confirmation
685        Ok(identity.identity().fingerprint())
686    }
687
688    /// Message loop: routes incoming messages to channel
689    async fn message_loop(
690        mut ws_source: WsSource,
691        incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
692        last_pong: Arc<Mutex<Instant>>,
693    ) -> Result<(), ProxyError> {
694        while let Some(msg_result) = ws_source.next().await {
695            let msg = msg_result.map_err(ws_err)?;
696
697            match msg {
698                Message::Text(text) => {
699                    let parsed: Messages = serde_json::from_str(&text)?;
700                    match parsed {
701                        Messages::Send {
702                            source,
703                            destination,
704                            payload,
705                        } => {
706                            // Server always includes source when forwarding messages
707                            if let Some(source) = source {
708                                incoming_tx
709                                    .send(IncomingMessage::Send {
710                                        source,
711                                        destination,
712                                        payload,
713                                    })
714                                    .ok();
715                            } else {
716                                tracing::warn!("Received Send message without source");
717                            }
718                        }
719                        Messages::RendezvousInfo(code) => {
720                            incoming_tx.send(IncomingMessage::RendezvousInfo(code)).ok();
721                        }
722                        Messages::IdentityInfo {
723                            fingerprint,
724                            identity,
725                        } => {
726                            incoming_tx
727                                .send(IncomingMessage::IdentityInfo {
728                                    fingerprint,
729                                    identity,
730                                })
731                                .ok();
732                        }
733                        Messages::GetIdentity(_) => {
734                            tracing::warn!("Received GetIdentity (client should not receive this)");
735                        }
736                        _ => tracing::warn!("Unexpected message type: {:?}", parsed),
737                    }
738                }
739                Message::Pong(_) => {
740                    *last_pong.lock().await = Instant::now();
741                }
742                Message::Close(_) => break,
743                _ => {}
744            }
745        }
746
747        Ok(())
748    }
749}