ap_proxy_client/protocol_client.rs
1use ap_proxy_protocol::{
2 IdentityFingerprint, IdentityKeyPair, Messages, ProxyError, RendezvousCode,
3};
4use futures_util::{SinkExt, StreamExt};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{Mutex, mpsc};
8use tokio::task::JoinHandle;
9use tokio::time::Instant;
10use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::Message};
11
12/// Interval between client-side WebSocket pings.
13const PING_INTERVAL: Duration = Duration::from_secs(30);
14/// Maximum time without a pong before the connection is considered dead.
15const PONG_TIMEOUT: Duration = Duration::from_secs(60);
16
17use super::config::{ClientState, IncomingMessage, ProxyClientConfig};
18
19/// Convert tungstenite errors into ProxyError (replaces the From impl that
20/// was removed from ap-proxy-protocol to keep it free of tungstenite deps).
21fn ws_err(e: tokio_tungstenite::tungstenite::Error) -> ProxyError {
22 ProxyError::WebSocket(e.to_string())
23}
24
25type WsStream = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
26type WsSink = futures_util::stream::SplitSink<WsStream, Message>;
27type WsSource = futures_util::stream::SplitStream<WsStream>;
28
29/// Client for connecting to and communicating through a ap-proxy server.
30///
31/// This is the main client API for connecting to a proxy server, authenticating,
32/// discovering peers via rendezvous codes, and sending messages.
33///
34/// # Lifecycle
35///
36/// 1. Create client with [`new()`](ProxyProtocolClient::new)
37/// 2. Connect and authenticate with [`connect()`](ProxyProtocolClient::connect)
38/// 3. Perform operations (send messages, request rendezvous codes, etc.)
39/// 4. Disconnect with [`disconnect()`](ProxyProtocolClient::disconnect)
40///
41/// # Examples
42///
43/// Basic usage:
44///
45/// ```no_run
46/// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage};
47///
48/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
49/// // Create and connect
50/// let config = ProxyClientConfig {
51/// proxy_url: "ws://localhost:8080".to_string(),
52/// identity_keypair: None,
53/// };
54/// let mut client = ProxyProtocolClient::new(config);
55/// let mut incoming = client.connect().await?;
56///
57/// // Handle messages
58/// tokio::spawn(async move {
59/// while let Some(msg) = incoming.recv().await {
60/// match msg {
61/// IncomingMessage::Send { source, payload, .. } => {
62/// println!("Got message from {:?}", source);
63/// }
64/// _ => {}
65/// }
66/// }
67/// });
68///
69/// // Send a message
70/// // client.send_to(target_fingerprint, b"Hello".to_vec()).await?;
71/// # Ok(())
72/// # }
73/// ```
74pub struct ProxyProtocolClient {
75 // Configuration
76 config: ProxyClientConfig,
77 identity: Arc<IdentityKeyPair>,
78
79 // Connection state
80 state: Arc<Mutex<ClientState>>,
81
82 // WebSocket components (None when disconnected)
83 outgoing_tx: Option<mpsc::UnboundedSender<Message>>,
84
85 // Task handles for cleanup
86 read_task_handle: Option<JoinHandle<()>>,
87 write_task_handle: Option<JoinHandle<()>>,
88}
89
90impl ProxyProtocolClient {
91 /// Create a new proxy client with the given configuration.
92 ///
93 /// This does not establish a connection - call [`connect()`](ProxyProtocolClient::connect)
94 /// to connect and authenticate.
95 ///
96 /// If `config.identity_keypair` is `None`, a new random identity will be generated.
97 /// Otherwise, the provided identity will be used for authentication.
98 ///
99 /// # Examples
100 ///
101 /// Create client with new identity:
102 ///
103 /// ```
104 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
105 ///
106 /// let config = ProxyClientConfig {
107 /// proxy_url: "ws://localhost:8080".to_string(),
108 /// identity_keypair: None, // Will generate new identity
109 /// };
110 /// let client = ProxyProtocolClient::new(config);
111 /// println!("Client fingerprint: {:?}", client.fingerprint());
112 /// ```
113 ///
114 /// Create client with existing identity:
115 ///
116 /// ```
117 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
118 ///
119 /// let keypair = IdentityKeyPair::generate();
120 /// let config = ProxyClientConfig {
121 /// proxy_url: "ws://localhost:8080".to_string(),
122 /// identity_keypair: Some(keypair),
123 /// };
124 /// let client = ProxyProtocolClient::new(config);
125 /// ```
126 pub fn new(mut config: ProxyClientConfig) -> Self {
127 let identity = Arc::new(
128 config
129 .identity_keypair
130 .take()
131 .unwrap_or_else(IdentityKeyPair::generate),
132 );
133
134 Self {
135 config,
136 identity,
137 state: Arc::new(Mutex::new(ClientState::Disconnected)),
138 outgoing_tx: None,
139 read_task_handle: None,
140 write_task_handle: None,
141 }
142 }
143
144 /// Connect to the proxy server and perform authentication.
145 ///
146 /// Establishes a WebSocket connection, completes the challenge-response authentication,
147 /// and returns a channel for receiving incoming messages.
148 ///
149 /// # Authentication Flow
150 ///
151 /// 1. Connect to WebSocket at the configured URL
152 /// 2. Receive authentication challenge from server
153 /// 3. Sign challenge with client's private key
154 /// 4. Send signed response to server
155 /// 5. Server verifies signature and accepts connection
156 ///
157 /// # Timeout
158 ///
159 /// Authentication must complete within 5 seconds or this method returns
160 /// [`ProxyError::AuthenticationTimeout`].
161 ///
162 /// # Errors
163 ///
164 /// - [`ProxyError::AlreadyConnected`] if already connected
165 /// - [`ProxyError::WebSocket`] if connection fails
166 /// - [`ProxyError::AuthenticationFailed`] if signature verification fails
167 /// - [`ProxyError::AuthenticationTimeout`] if authentication takes too long
168 ///
169 /// # Examples
170 ///
171 /// ```no_run
172 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage};
173 ///
174 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
175 /// let config = ProxyClientConfig {
176 /// proxy_url: "ws://localhost:8080".to_string(),
177 /// identity_keypair: None,
178 /// };
179 /// let mut client = ProxyProtocolClient::new(config);
180 ///
181 /// // Connect and get incoming message channel
182 /// let mut incoming = client.connect().await?;
183 ///
184 /// // Handle messages
185 /// while let Some(msg) = incoming.recv().await {
186 /// println!("Received: {:?}", msg);
187 /// }
188 /// # Ok(())
189 /// # }
190 /// ```
191 pub async fn connect(
192 &mut self,
193 ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ProxyError> {
194 // Check not already connected
195 {
196 let state = self.state.lock().await;
197 if !matches!(*state, ClientState::Disconnected) {
198 return Err(ProxyError::AlreadyConnected);
199 }
200 }
201
202 // Connect WebSocket
203 let (ws_stream, _) = connect_async(&self.config.proxy_url)
204 .await
205 .map_err(ws_err)?;
206
207 // Split into read/write
208 let (ws_sink, ws_source) = ws_stream.split();
209
210 // Update state to Connected
211 *self.state.lock().await = ClientState::Connected;
212
213 // Create channels
214 let (outgoing_tx, outgoing_rx) = mpsc::unbounded_channel::<Message>();
215 let (incoming_tx, incoming_rx) = mpsc::unbounded_channel::<IncomingMessage>();
216 let (auth_tx, mut auth_rx) = mpsc::unbounded_channel::<Result<(), ProxyError>>();
217
218 // Shared pong tracker for keep-alive
219 let last_pong = Arc::new(Mutex::new(Instant::now()));
220
221 // Spawn write task
222 let write_handle = tokio::spawn(Self::write_task(
223 ws_sink,
224 outgoing_rx,
225 Arc::clone(&last_pong),
226 ));
227
228 // Spawn read task (handles auth + message routing)
229 let read_handle = tokio::spawn(Self::read_task(
230 ws_source,
231 outgoing_tx.clone(),
232 incoming_tx,
233 Arc::clone(&self.identity),
234 self.state.clone(),
235 auth_tx,
236 last_pong,
237 ));
238
239 // Wait for authentication to complete
240 match tokio::time::timeout(tokio::time::Duration::from_secs(5), auth_rx.recv()).await {
241 Ok(Some(Ok(()))) => {
242 // Authentication succeeded
243 }
244 Ok(Some(Err(e))) => {
245 // Authentication failed
246 self.read_task_handle = Some(read_handle);
247 self.write_task_handle = Some(write_handle);
248 self.disconnect().await?;
249 return Err(e);
250 }
251 Ok(None) | Err(_) => {
252 // Channel closed or timeout
253 self.read_task_handle = Some(read_handle);
254 self.write_task_handle = Some(write_handle);
255 self.disconnect().await?;
256 return Err(ProxyError::AuthenticationTimeout);
257 }
258 }
259
260 // Store handles and tx
261 self.outgoing_tx = Some(outgoing_tx);
262 self.read_task_handle = Some(read_handle);
263 self.write_task_handle = Some(write_handle);
264
265 Ok(incoming_rx)
266 }
267
268 /// Send a message to another authenticated client.
269 ///
270 /// The message is routed through the proxy server to the destination client.
271 /// The proxy validates the source identity but cannot inspect the payload.
272 ///
273 /// # Authentication Required
274 ///
275 /// This method requires an active authenticated connection. Call
276 /// [`connect()`](ProxyProtocolClient::connect) first.
277 ///
278 /// # Payload Encryption
279 ///
280 /// The proxy does not encrypt message payloads. Clients should implement
281 /// end-to-end encryption (e.g., using the Noise protocol) before calling this method.
282 ///
283 /// # Errors
284 ///
285 /// - [`ProxyError::NotConnected`] if not connected or not authenticated
286 /// - [`ProxyError::DestinationNotFound`] if the destination client is not connected
287 /// - [`ProxyError::Serialization`] if message encoding fails
288 ///
289 /// # Examples
290 ///
291 /// ```no_run
292 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
293 ///
294 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
295 /// # let config = ProxyClientConfig {
296 /// # proxy_url: "ws://localhost:8080".to_string(),
297 /// # identity_keypair: None,
298 /// # };
299 /// let mut client = ProxyProtocolClient::new(config);
300 /// client.connect().await?;
301 ///
302 /// // Get destination fingerprint from rendezvous lookup
303 /// // let destination = ...; // from IncomingMessage::IdentityInfo
304 ///
305 /// // Send encrypted message
306 /// // let payload = encrypt_message(b"Hello!")?;
307 /// // client.send_to(destination, payload).await?;
308 /// # Ok(())
309 /// # }
310 /// ```
311 pub async fn send_to(
312 &self,
313 destination: IdentityFingerprint,
314 payload: Vec<u8>,
315 ) -> Result<(), ProxyError> {
316 // Check authenticated
317 {
318 let state = self.state.lock().await;
319 if !matches!(*state, ClientState::Authenticated { .. }) {
320 return Err(ProxyError::NotConnected);
321 }
322 }
323
324 // Create Send message without source (server will add it)
325 let msg = Messages::Send {
326 source: None,
327 destination,
328 payload,
329 };
330
331 let json = serde_json::to_string(&msg)?;
332
333 // Send via outgoing_tx channel
334 if let Some(tx) = &self.outgoing_tx {
335 tx.send(Message::Text(json))
336 .map_err(|_| ProxyError::ChannelSendFailed)?;
337 Ok(())
338 } else {
339 Err(ProxyError::NotConnected)
340 }
341 }
342
343 /// Request a rendezvous code from the server.
344 ///
345 /// The server will generate a temporary code (format: "ABC-DEF-GHI") that maps to your
346 /// identity. The code will be delivered via [`IncomingMessage::RendezvousInfo`] on the
347 /// channel returned by [`connect()`](ProxyProtocolClient::connect).
348 ///
349 /// # Rendezvous Code Properties
350 ///
351 /// - Format: 9 alphanumeric characters (e.g., "ABC-DEF-GHI")
352 /// - Lifetime: 5 minutes
353 /// - Single-use: deleted after lookup
354 /// - Enables peer discovery without sharing long-lived identifiers
355 ///
356 /// # Usage Pattern
357 ///
358 /// 1. Call this method to request a code
359 /// 2. Receive the code via [`IncomingMessage::RendezvousInfo`]
360 /// 3. Share the code with a peer (e.g., display as QR code)
361 /// 4. Peer uses [`request_identity()`](ProxyProtocolClient::request_identity) to look up your identity
362 ///
363 /// # Authentication Required
364 ///
365 /// This method requires an active authenticated connection.
366 ///
367 /// # Errors
368 ///
369 /// - [`ProxyError::NotConnected`] if not connected or not authenticated
370 ///
371 /// # Examples
372 ///
373 /// ```no_run
374 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage};
375 ///
376 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
377 /// # let config = ProxyClientConfig {
378 /// # proxy_url: "ws://localhost:8080".to_string(),
379 /// # identity_keypair: None,
380 /// # };
381 /// let mut client = ProxyProtocolClient::new(config);
382 /// let mut incoming = client.connect().await?;
383 ///
384 /// // Request a code
385 /// client.request_rendezvous().await?;
386 ///
387 /// // Wait for response
388 /// if let Some(IncomingMessage::RendezvousInfo(code)) = incoming.recv().await {
389 /// println!("Share this code: {}", code.as_str());
390 /// // Display as QR code, send via messaging, etc.
391 /// }
392 /// # Ok(())
393 /// # }
394 /// ```
395 pub async fn request_rendezvous(&self) -> Result<(), ProxyError> {
396 // Check authenticated
397 {
398 let state = self.state.lock().await;
399 if !matches!(*state, ClientState::Authenticated { .. }) {
400 return Err(ProxyError::NotConnected);
401 }
402 }
403
404 // Send GetRendezvous message
405 let msg = Messages::GetRendezvous;
406 let json = serde_json::to_string(&msg)?;
407
408 // Send via outgoing_tx channel
409 if let Some(tx) = &self.outgoing_tx {
410 tx.send(Message::Text(json))
411 .map_err(|_| ProxyError::ChannelSendFailed)?;
412 Ok(())
413 } else {
414 Err(ProxyError::NotConnected)
415 }
416 }
417
418 /// Look up a peer's identity using a rendezvous code.
419 ///
420 /// Queries the server for the identity associated with a rendezvous code.
421 /// If the code is valid and hasn't expired, the server responds with
422 /// [`IncomingMessage::IdentityInfo`] containing the peer's identity and fingerprint.
423 ///
424 /// # Code Consumption
425 ///
426 /// Rendezvous codes are single-use. After successful lookup, the server deletes
427 /// the code and it cannot be used again.
428 ///
429 /// # Authentication Required
430 ///
431 /// This method requires an active authenticated connection.
432 ///
433 /// # Errors
434 ///
435 /// - [`ProxyError::NotConnected`] if not connected or not authenticated
436 ///
437 /// The server may not respond if the code is invalid, expired, or already used.
438 /// Implement a timeout when waiting for the response.
439 ///
440 /// # Examples
441 ///
442 /// ```no_run
443 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage, RendezvousCode};
444 ///
445 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
446 /// # let config = ProxyClientConfig {
447 /// # proxy_url: "ws://localhost:8080".to_string(),
448 /// # identity_keypair: None,
449 /// # };
450 /// let mut client = ProxyProtocolClient::new(config);
451 /// let mut incoming = client.connect().await?;
452 ///
453 /// // Get code from user (e.g., QR scan, text input)
454 /// let code = RendezvousCode::from_string("ABC-DEF-GHI".to_string());
455 ///
456 /// // Look up the identity
457 /// client.request_identity(code).await?;
458 ///
459 /// // Wait for response with timeout
460 /// match tokio::time::timeout(
461 /// tokio::time::Duration::from_secs(5),
462 /// incoming.recv()
463 /// ).await {
464 /// Ok(Some(IncomingMessage::IdentityInfo { fingerprint, identity })) => {
465 /// println!("Found peer: {:?}", fingerprint);
466 /// // Now you can send messages to this peer
467 /// // client.send_to(fingerprint, payload).await?;
468 /// }
469 /// _ => {
470 /// println!("Code not found or expired");
471 /// }
472 /// }
473 /// # Ok(())
474 /// # }
475 /// ```
476 pub async fn request_identity(
477 &self,
478 rendezvous_code: RendezvousCode,
479 ) -> Result<(), ProxyError> {
480 // Check authenticated
481 {
482 let state = self.state.lock().await;
483 if !matches!(*state, ClientState::Authenticated { .. }) {
484 return Err(ProxyError::NotConnected);
485 }
486 }
487
488 // Send GetIdentity message
489 let msg = Messages::GetIdentity(rendezvous_code);
490 let json = serde_json::to_string(&msg)?;
491
492 // Send via outgoing_tx channel
493 if let Some(tx) = &self.outgoing_tx {
494 tx.send(Message::Text(json))
495 .map_err(|_| ProxyError::ChannelSendFailed)?;
496 Ok(())
497 } else {
498 Err(ProxyError::NotConnected)
499 }
500 }
501
502 /// Get this client's identity fingerprint.
503 ///
504 /// Returns the SHA256 fingerprint of the client's public key. This is the
505 /// identifier that other clients use to send messages to this client.
506 ///
507 /// The fingerprint is available immediately after creating the client, before
508 /// connecting.
509 ///
510 /// # Examples
511 ///
512 /// ```
513 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
514 ///
515 /// let config = ProxyClientConfig {
516 /// proxy_url: "ws://localhost:8080".to_string(),
517 /// identity_keypair: None,
518 /// };
519 /// let client = ProxyProtocolClient::new(config);
520 /// println!("My fingerprint: {:?}", client.fingerprint());
521 /// ```
522 pub fn fingerprint(&self) -> IdentityFingerprint {
523 self.identity.identity().fingerprint()
524 }
525
526 /// Check if the client is currently authenticated.
527 ///
528 /// Returns `true` if the client has completed authentication and can send/receive
529 /// messages. Returns `false` if disconnected or still connecting.
530 ///
531 /// # Examples
532 ///
533 /// ```no_run
534 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
535 ///
536 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
537 /// # let config = ProxyClientConfig {
538 /// # proxy_url: "ws://localhost:8080".to_string(),
539 /// # identity_keypair: None,
540 /// # };
541 /// let mut client = ProxyProtocolClient::new(config);
542 ///
543 /// assert!(!client.is_authenticated().await);
544 ///
545 /// client.connect().await?;
546 /// assert!(client.is_authenticated().await);
547 ///
548 /// client.disconnect().await?;
549 /// assert!(!client.is_authenticated().await);
550 /// # Ok(())
551 /// # }
552 /// ```
553 pub async fn is_authenticated(&self) -> bool {
554 matches!(*self.state.lock().await, ClientState::Authenticated { .. })
555 }
556
557 /// Disconnect from the proxy server and clean up resources.
558 ///
559 /// Aborts background tasks, closes the WebSocket connection, and resets state.
560 /// After disconnecting, you can call [`connect()`](ProxyProtocolClient::connect)
561 /// again to reconnect.
562 ///
563 /// This method is automatically called when the client is dropped, but calling it
564 /// explicitly allows you to handle errors.
565 ///
566 /// # Examples
567 ///
568 /// ```no_run
569 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
570 ///
571 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
572 /// # let config = ProxyClientConfig {
573 /// # proxy_url: "ws://localhost:8080".to_string(),
574 /// # identity_keypair: None,
575 /// # };
576 /// let mut client = ProxyProtocolClient::new(config);
577 /// client.connect().await?;
578 ///
579 /// // Do work...
580 ///
581 /// // Clean disconnect
582 /// client.disconnect().await?;
583 /// # Ok(())
584 /// # }
585 /// ```
586 pub async fn disconnect(&mut self) -> Result<(), ProxyError> {
587 // Abort tasks
588 if let Some(handle) = self.read_task_handle.take() {
589 handle.abort();
590 }
591 if let Some(handle) = self.write_task_handle.take() {
592 handle.abort();
593 }
594
595 // Clear state
596 *self.state.lock().await = ClientState::Disconnected;
597
598 // Close channels
599 self.outgoing_tx = None;
600
601 Ok(())
602 }
603
604 /// Write task: sends messages from channel to WebSocket, with keep-alive pings
605 async fn write_task(
606 mut ws_sink: WsSink,
607 mut outgoing_rx: mpsc::UnboundedReceiver<Message>,
608 last_pong: Arc<Mutex<Instant>>,
609 ) {
610 let mut ping_interval = tokio::time::interval(PING_INTERVAL);
611 ping_interval.tick().await; // consume the immediate first tick
612
613 loop {
614 tokio::select! {
615 msg = outgoing_rx.recv() => {
616 match msg {
617 Some(msg) => {
618 if ws_sink.send(msg).await.is_err() {
619 break;
620 }
621 }
622 None => break,
623 }
624 }
625 _ = ping_interval.tick() => {
626 // Check if we received a pong recently
627 let elapsed = last_pong.lock().await.elapsed();
628 if elapsed > PONG_TIMEOUT {
629 tracing::warn!("No pong received in {:?}, closing connection", elapsed);
630 break;
631 }
632 if ws_sink.send(Message::Ping(vec![])).await.is_err() {
633 break;
634 }
635 }
636 }
637 }
638 }
639
640 /// Read task: handles authentication and routes messages
641 async fn read_task(
642 mut ws_source: WsSource,
643 outgoing_tx: mpsc::UnboundedSender<Message>,
644 incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
645 identity: Arc<IdentityKeyPair>,
646 state: Arc<Mutex<ClientState>>,
647 auth_tx: mpsc::UnboundedSender<Result<(), ProxyError>>,
648 last_pong: Arc<Mutex<Instant>>,
649 ) {
650 // Handle authentication
651 match Self::handle_authentication(&mut ws_source, &outgoing_tx, &identity).await {
652 Ok(fingerprint) => {
653 *state.lock().await = ClientState::Authenticated { fingerprint };
654 // Notify that authentication succeeded
655 let _ = auth_tx.send(Ok(()));
656 }
657 Err(e) => {
658 tracing::error!("Authentication failed: {}", e);
659 *state.lock().await = ClientState::Disconnected;
660 // Notify that authentication failed
661 let _ = auth_tx.send(Err(e));
662 return;
663 }
664 }
665
666 // Enter message loop
667 if let Err(e) = Self::message_loop(ws_source, incoming_tx, last_pong).await {
668 tracing::error!("Message loop error: {}", e);
669 }
670
671 *state.lock().await = ClientState::Disconnected;
672 }
673
674 /// Handle authentication challenge-response
675 async fn handle_authentication(
676 ws_source: &mut WsSource,
677 outgoing_tx: &mpsc::UnboundedSender<Message>,
678 identity: &Arc<IdentityKeyPair>,
679 ) -> Result<IdentityFingerprint, ProxyError> {
680 // Receive AuthChallenge
681 let challenge_msg = ws_source
682 .next()
683 .await
684 .ok_or(ProxyError::ConnectionClosed)?
685 .map_err(ws_err)?;
686
687 let challenge = match challenge_msg {
688 Message::Text(text) => match serde_json::from_str::<Messages>(&text)? {
689 Messages::AuthChallenge(c) => c,
690 _ => return Err(ProxyError::InvalidMessage("Expected AuthChallenge".into())),
691 },
692 _ => return Err(ProxyError::InvalidMessage("Expected text message".into())),
693 };
694
695 // Sign challenge
696 let response = challenge.sign(identity);
697 let auth_response = Messages::AuthResponse(identity.identity(), response);
698 let auth_json = serde_json::to_string(&auth_response)?;
699
700 // Send auth response
701 outgoing_tx
702 .send(Message::Text(auth_json))
703 .map_err(|_| ProxyError::ChannelSendFailed)?;
704
705 // Authentication complete - server doesn't send confirmation
706 Ok(identity.identity().fingerprint())
707 }
708
709 /// Message loop: routes incoming messages to channel
710 async fn message_loop(
711 mut ws_source: WsSource,
712 incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
713 last_pong: Arc<Mutex<Instant>>,
714 ) -> Result<(), ProxyError> {
715 while let Some(msg_result) = ws_source.next().await {
716 let msg = msg_result.map_err(ws_err)?;
717
718 match msg {
719 Message::Text(text) => {
720 let parsed: Messages = serde_json::from_str(&text)?;
721 match parsed {
722 Messages::Send {
723 source,
724 destination,
725 payload,
726 } => {
727 // Server always includes source when forwarding messages
728 if let Some(source) = source {
729 incoming_tx
730 .send(IncomingMessage::Send {
731 source,
732 destination,
733 payload,
734 })
735 .ok();
736 } else {
737 tracing::warn!("Received Send message without source");
738 }
739 }
740 Messages::RendezvousInfo(code) => {
741 incoming_tx.send(IncomingMessage::RendezvousInfo(code)).ok();
742 }
743 Messages::IdentityInfo {
744 fingerprint,
745 identity,
746 } => {
747 incoming_tx
748 .send(IncomingMessage::IdentityInfo {
749 fingerprint,
750 identity,
751 })
752 .ok();
753 }
754 Messages::GetIdentity(_) => {
755 tracing::warn!("Received GetIdentity (client should not receive this)");
756 }
757 _ => tracing::warn!("Unexpected message type: {:?}", parsed),
758 }
759 }
760 Message::Pong(_) => {
761 *last_pong.lock().await = Instant::now();
762 }
763 Message::Close(_) => break,
764 _ => {}
765 }
766 }
767
768 Ok(())
769 }
770}