ap_proxy_client/protocol_client.rs
1use ap_proxy_protocol::{
2 IdentityFingerprint, IdentityKeyPair, Messages, ProxyError, RendevouzCode,
3};
4use futures_util::{SinkExt, StreamExt};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{Mutex, mpsc};
8use tokio::task::JoinHandle;
9use tokio::time::Instant;
10use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::Message};
11
12/// Interval between client-side WebSocket pings.
13const PING_INTERVAL: Duration = Duration::from_secs(30);
14/// Maximum time without a pong before the connection is considered dead.
15const PONG_TIMEOUT: Duration = Duration::from_secs(60);
16
17use super::config::{ClientState, IncomingMessage, ProxyClientConfig};
18
19/// Convert tungstenite errors into ProxyError (replaces the From impl that
20/// was removed from ap-proxy-protocol to keep it free of tungstenite deps).
21fn ws_err(e: tokio_tungstenite::tungstenite::Error) -> ProxyError {
22 ProxyError::WebSocket(e.to_string())
23}
24
25type WsStream = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
26type WsSink = futures_util::stream::SplitSink<WsStream, Message>;
27type WsSource = futures_util::stream::SplitStream<WsStream>;
28
29/// Client for connecting to and communicating through a ap-proxy server.
30///
31/// This is the main client API for connecting to a proxy server, authenticating,
32/// discovering peers via rendezvous codes, and sending messages.
33///
34/// # Lifecycle
35///
36/// 1. Create client with [`new()`](ProxyProtocolClient::new)
37/// 2. Connect and authenticate with [`connect()`](ProxyProtocolClient::connect)
38/// 3. Perform operations (send messages, request rendezvous codes, etc.)
39/// 4. Disconnect with [`disconnect()`](ProxyProtocolClient::disconnect)
40///
41/// # Examples
42///
43/// Basic usage:
44///
45/// ```no_run
46/// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage};
47///
48/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
49/// // Create and connect
50/// let config = ProxyClientConfig {
51/// proxy_url: "ws://localhost:8080".to_string(),
52/// identity_keypair: None,
53/// };
54/// let mut client = ProxyProtocolClient::new(config);
55/// let mut incoming = client.connect().await?;
56///
57/// // Handle messages
58/// tokio::spawn(async move {
59/// while let Some(msg) = incoming.recv().await {
60/// match msg {
61/// IncomingMessage::Send { source, payload, .. } => {
62/// println!("Got message from {:?}", source);
63/// }
64/// _ => {}
65/// }
66/// }
67/// });
68///
69/// // Send a message
70/// // client.send_to(target_fingerprint, b"Hello".to_vec()).await?;
71/// # Ok(())
72/// # }
73/// ```
74pub struct ProxyProtocolClient {
75 // Configuration
76 config: ProxyClientConfig,
77 identity: Arc<IdentityKeyPair>,
78
79 // Connection state
80 state: Arc<Mutex<ClientState>>,
81
82 // WebSocket components (None when disconnected)
83 outgoing_tx: Option<mpsc::UnboundedSender<Message>>,
84
85 // Task handles for cleanup
86 read_task_handle: Option<JoinHandle<()>>,
87 write_task_handle: Option<JoinHandle<()>>,
88}
89
90impl ProxyProtocolClient {
91 /// Create a new proxy client with the given configuration.
92 ///
93 /// This does not establish a connection - call [`connect()`](ProxyProtocolClient::connect)
94 /// to connect and authenticate.
95 ///
96 /// If `config.identity_keypair` is `None`, a new random identity will be generated.
97 /// Otherwise, the provided identity will be used for authentication.
98 ///
99 /// # Examples
100 ///
101 /// Create client with new identity:
102 ///
103 /// ```
104 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
105 ///
106 /// let config = ProxyClientConfig {
107 /// proxy_url: "ws://localhost:8080".to_string(),
108 /// identity_keypair: None, // Will generate new identity
109 /// };
110 /// let client = ProxyProtocolClient::new(config);
111 /// println!("Client fingerprint: {:?}", client.fingerprint());
112 /// ```
113 ///
114 /// Create client with existing identity:
115 ///
116 /// ```
117 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
118 ///
119 /// let keypair = IdentityKeyPair::generate();
120 /// let config = ProxyClientConfig {
121 /// proxy_url: "ws://localhost:8080".to_string(),
122 /// identity_keypair: Some(keypair),
123 /// };
124 /// let client = ProxyProtocolClient::new(config);
125 /// ```
126 pub fn new(mut config: ProxyClientConfig) -> Self {
127 let identity = Arc::new(
128 config
129 .identity_keypair
130 .take()
131 .unwrap_or_else(IdentityKeyPair::generate),
132 );
133
134 Self {
135 config,
136 identity,
137 state: Arc::new(Mutex::new(ClientState::Disconnected)),
138 outgoing_tx: None,
139 read_task_handle: None,
140 write_task_handle: None,
141 }
142 }
143
144 /// Connect to the proxy server and perform authentication.
145 ///
146 /// Establishes a WebSocket connection, completes the challenge-response authentication,
147 /// and returns a channel for receiving incoming messages.
148 ///
149 /// # Authentication Flow
150 ///
151 /// 1. Connect to WebSocket at the configured URL
152 /// 2. Receive authentication challenge from server
153 /// 3. Sign challenge with client's private key
154 /// 4. Send signed response to server
155 /// 5. Server verifies signature and accepts connection
156 ///
157 /// # Timeout
158 ///
159 /// Authentication must complete within 5 seconds or this method returns
160 /// [`ProxyError::AuthenticationTimeout`].
161 ///
162 /// # Errors
163 ///
164 /// - [`ProxyError::AlreadyConnected`] if already connected
165 /// - [`ProxyError::WebSocket`] if connection fails
166 /// - [`ProxyError::AuthenticationFailed`] if signature verification fails
167 /// - [`ProxyError::AuthenticationTimeout`] if authentication takes too long
168 ///
169 /// # Examples
170 ///
171 /// ```no_run
172 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage};
173 ///
174 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
175 /// let config = ProxyClientConfig {
176 /// proxy_url: "ws://localhost:8080".to_string(),
177 /// identity_keypair: None,
178 /// };
179 /// let mut client = ProxyProtocolClient::new(config);
180 ///
181 /// // Connect and get incoming message channel
182 /// let mut incoming = client.connect().await?;
183 ///
184 /// // Handle messages
185 /// while let Some(msg) = incoming.recv().await {
186 /// println!("Received: {:?}", msg);
187 /// }
188 /// # Ok(())
189 /// # }
190 /// ```
191 pub async fn connect(
192 &mut self,
193 ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ProxyError> {
194 // Check not already connected
195 {
196 let state = self.state.lock().await;
197 if !matches!(*state, ClientState::Disconnected) {
198 return Err(ProxyError::AlreadyConnected);
199 }
200 }
201
202 // Connect WebSocket
203 let (ws_stream, _) = connect_async(&self.config.proxy_url)
204 .await
205 .map_err(ws_err)?;
206
207 // Split into read/write
208 let (ws_sink, ws_source) = ws_stream.split();
209
210 // Update state to Connected
211 *self.state.lock().await = ClientState::Connected;
212
213 // Create channels
214 let (outgoing_tx, outgoing_rx) = mpsc::unbounded_channel::<Message>();
215 let (incoming_tx, incoming_rx) = mpsc::unbounded_channel::<IncomingMessage>();
216 let (auth_tx, mut auth_rx) = mpsc::unbounded_channel::<Result<(), ProxyError>>();
217
218 // Shared pong tracker for keep-alive
219 let last_pong = Arc::new(Mutex::new(Instant::now()));
220
221 // Spawn write task
222 let write_handle = tokio::spawn(Self::write_task(
223 ws_sink,
224 outgoing_rx,
225 Arc::clone(&last_pong),
226 ));
227
228 // Spawn read task (handles auth + message routing)
229 let read_handle = tokio::spawn(Self::read_task(
230 ws_source,
231 outgoing_tx.clone(),
232 incoming_tx,
233 Arc::clone(&self.identity),
234 self.state.clone(),
235 auth_tx,
236 last_pong,
237 ));
238
239 // Wait for authentication to complete
240 match tokio::time::timeout(tokio::time::Duration::from_secs(5), auth_rx.recv()).await {
241 Ok(Some(Ok(()))) => {
242 // Authentication succeeded
243 }
244 Ok(Some(Err(e))) => {
245 // Authentication failed
246 self.read_task_handle = Some(read_handle);
247 self.write_task_handle = Some(write_handle);
248 self.disconnect().await?;
249 return Err(e);
250 }
251 Ok(None) | Err(_) => {
252 // Channel closed or timeout
253 self.read_task_handle = Some(read_handle);
254 self.write_task_handle = Some(write_handle);
255 self.disconnect().await?;
256 return Err(ProxyError::AuthenticationTimeout);
257 }
258 }
259
260 // Store handles and tx
261 self.outgoing_tx = Some(outgoing_tx);
262 self.read_task_handle = Some(read_handle);
263 self.write_task_handle = Some(write_handle);
264
265 Ok(incoming_rx)
266 }
267
268 /// Send a message to another authenticated client.
269 ///
270 /// The message is routed through the proxy server to the destination client.
271 /// The proxy validates the source identity but cannot inspect the payload.
272 ///
273 /// # Authentication Required
274 ///
275 /// This method requires an active authenticated connection. Call
276 /// [`connect()`](ProxyProtocolClient::connect) first.
277 ///
278 /// # Payload Encryption
279 ///
280 /// The proxy does not encrypt message payloads. Clients should implement
281 /// end-to-end encryption (e.g., using the Noise protocol) before calling this method.
282 ///
283 /// # Errors
284 ///
285 /// - [`ProxyError::NotConnected`] if not connected or not authenticated
286 /// - [`ProxyError::DestinationNotFound`] if the destination client is not connected
287 /// - [`ProxyError::Serialization`] if message encoding fails
288 ///
289 /// # Examples
290 ///
291 /// ```no_run
292 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
293 ///
294 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
295 /// # let config = ProxyClientConfig {
296 /// # proxy_url: "ws://localhost:8080".to_string(),
297 /// # identity_keypair: None,
298 /// # };
299 /// let mut client = ProxyProtocolClient::new(config);
300 /// client.connect().await?;
301 ///
302 /// // Get destination fingerprint from rendezvous lookup
303 /// // let destination = ...; // from IncomingMessage::IdentityInfo
304 ///
305 /// // Send encrypted message
306 /// // let payload = encrypt_message(b"Hello!")?;
307 /// // client.send_to(destination, payload).await?;
308 /// # Ok(())
309 /// # }
310 /// ```
311 pub async fn send_to(
312 &self,
313 destination: IdentityFingerprint,
314 payload: Vec<u8>,
315 ) -> Result<(), ProxyError> {
316 // Check authenticated
317 {
318 let state = self.state.lock().await;
319 if !matches!(*state, ClientState::Authenticated { .. }) {
320 return Err(ProxyError::NotConnected);
321 }
322 }
323
324 // Create Send message without source (server will add it)
325 let msg = Messages::Send {
326 source: None,
327 destination,
328 payload,
329 };
330
331 let json = serde_json::to_string(&msg)?;
332
333 // Send via outgoing_tx channel
334 if let Some(tx) = &self.outgoing_tx {
335 tx.send(Message::Text(json))
336 .map_err(|_| ProxyError::ChannelSendFailed)?;
337 Ok(())
338 } else {
339 Err(ProxyError::NotConnected)
340 }
341 }
342
343 /// Request a rendezvous code from the server.
344 ///
345 /// The server will generate a temporary code (format: "ABC-DEF-GHI") that maps to your
346 /// identity. The code will be delivered via [`IncomingMessage::RendevouzInfo`] on the
347 /// channel returned by [`connect()`](ProxyProtocolClient::connect).
348 ///
349 /// # Rendezvous Code Properties
350 ///
351 /// - Format: 9 alphanumeric characters (e.g., "ABC-DEF-GHI")
352 /// - Lifetime: 5 minutes
353 /// - Single-use: deleted after lookup
354 /// - Enables peer discovery without sharing long-lived identifiers
355 ///
356 /// # Usage Pattern
357 ///
358 /// 1. Call this method to request a code
359 /// 2. Receive the code via [`IncomingMessage::RendevouzInfo`]
360 /// 3. Share the code with a peer (e.g., display as QR code)
361 /// 4. Peer uses [`request_identity()`](ProxyProtocolClient::request_identity) to look up your identity
362 ///
363 /// # Authentication Required
364 ///
365 /// This method requires an active authenticated connection.
366 ///
367 /// # Errors
368 ///
369 /// - [`ProxyError::NotConnected`] if not connected or not authenticated
370 ///
371 /// # Examples
372 ///
373 /// ```no_run
374 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage};
375 ///
376 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
377 /// # let config = ProxyClientConfig {
378 /// # proxy_url: "ws://localhost:8080".to_string(),
379 /// # identity_keypair: None,
380 /// # };
381 /// let mut client = ProxyProtocolClient::new(config);
382 /// let mut incoming = client.connect().await?;
383 ///
384 /// // Request a code
385 /// client.request_rendezvous().await?;
386 ///
387 /// // Wait for response
388 /// if let Some(IncomingMessage::RendevouzInfo(code)) = incoming.recv().await {
389 /// println!("Share this code: {}", code.as_str());
390 /// // Display as QR code, send via messaging, etc.
391 /// }
392 /// # Ok(())
393 /// # }
394 /// ```
395 pub async fn request_rendezvous(&self) -> Result<(), ProxyError> {
396 // Check authenticated
397 {
398 let state = self.state.lock().await;
399 if !matches!(*state, ClientState::Authenticated { .. }) {
400 return Err(ProxyError::NotConnected);
401 }
402 }
403
404 // Send GetRendevouz message
405 let msg = Messages::GetRendevouz;
406 let json = serde_json::to_string(&msg)?;
407
408 // Send via outgoing_tx channel
409 if let Some(tx) = &self.outgoing_tx {
410 tx.send(Message::Text(json))
411 .map_err(|_| ProxyError::ChannelSendFailed)?;
412 Ok(())
413 } else {
414 Err(ProxyError::NotConnected)
415 }
416 }
417
418 /// Look up a peer's identity using a rendezvous code.
419 ///
420 /// Queries the server for the identity associated with a rendezvous code.
421 /// If the code is valid and hasn't expired, the server responds with
422 /// [`IncomingMessage::IdentityInfo`] containing the peer's identity and fingerprint.
423 ///
424 /// # Code Consumption
425 ///
426 /// Rendezvous codes are single-use. After successful lookup, the server deletes
427 /// the code and it cannot be used again.
428 ///
429 /// # Authentication Required
430 ///
431 /// This method requires an active authenticated connection.
432 ///
433 /// # Errors
434 ///
435 /// - [`ProxyError::NotConnected`] if not connected or not authenticated
436 ///
437 /// The server may not respond if the code is invalid, expired, or already used.
438 /// Implement a timeout when waiting for the response.
439 ///
440 /// # Examples
441 ///
442 /// ```no_run
443 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage, RendevouzCode};
444 ///
445 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
446 /// # let config = ProxyClientConfig {
447 /// # proxy_url: "ws://localhost:8080".to_string(),
448 /// # identity_keypair: None,
449 /// # };
450 /// let mut client = ProxyProtocolClient::new(config);
451 /// let mut incoming = client.connect().await?;
452 ///
453 /// // Get code from user (e.g., QR scan, text input)
454 /// let code = RendevouzCode::from_string("ABC-DEF-GHI".to_string());
455 ///
456 /// // Look up the identity
457 /// client.request_identity(code).await?;
458 ///
459 /// // Wait for response with timeout
460 /// match tokio::time::timeout(
461 /// tokio::time::Duration::from_secs(5),
462 /// incoming.recv()
463 /// ).await {
464 /// Ok(Some(IncomingMessage::IdentityInfo { fingerprint, identity })) => {
465 /// println!("Found peer: {:?}", fingerprint);
466 /// // Now you can send messages to this peer
467 /// // client.send_to(fingerprint, payload).await?;
468 /// }
469 /// _ => {
470 /// println!("Code not found or expired");
471 /// }
472 /// }
473 /// # Ok(())
474 /// # }
475 /// ```
476 pub async fn request_identity(&self, rendezvous_code: RendevouzCode) -> Result<(), ProxyError> {
477 // Check authenticated
478 {
479 let state = self.state.lock().await;
480 if !matches!(*state, ClientState::Authenticated { .. }) {
481 return Err(ProxyError::NotConnected);
482 }
483 }
484
485 // Send GetIdentity message
486 let msg = Messages::GetIdentity(rendezvous_code);
487 let json = serde_json::to_string(&msg)?;
488
489 // Send via outgoing_tx channel
490 if let Some(tx) = &self.outgoing_tx {
491 tx.send(Message::Text(json))
492 .map_err(|_| ProxyError::ChannelSendFailed)?;
493 Ok(())
494 } else {
495 Err(ProxyError::NotConnected)
496 }
497 }
498
499 /// Get this client's identity fingerprint.
500 ///
501 /// Returns the SHA256 fingerprint of the client's public key. This is the
502 /// identifier that other clients use to send messages to this client.
503 ///
504 /// The fingerprint is available immediately after creating the client, before
505 /// connecting.
506 ///
507 /// # Examples
508 ///
509 /// ```
510 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
511 ///
512 /// let config = ProxyClientConfig {
513 /// proxy_url: "ws://localhost:8080".to_string(),
514 /// identity_keypair: None,
515 /// };
516 /// let client = ProxyProtocolClient::new(config);
517 /// println!("My fingerprint: {:?}", client.fingerprint());
518 /// ```
519 pub fn fingerprint(&self) -> IdentityFingerprint {
520 self.identity.identity().fingerprint()
521 }
522
523 /// Check if the client is currently authenticated.
524 ///
525 /// Returns `true` if the client has completed authentication and can send/receive
526 /// messages. Returns `false` if disconnected or still connecting.
527 ///
528 /// # Examples
529 ///
530 /// ```no_run
531 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
532 ///
533 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
534 /// # let config = ProxyClientConfig {
535 /// # proxy_url: "ws://localhost:8080".to_string(),
536 /// # identity_keypair: None,
537 /// # };
538 /// let mut client = ProxyProtocolClient::new(config);
539 ///
540 /// assert!(!client.is_authenticated().await);
541 ///
542 /// client.connect().await?;
543 /// assert!(client.is_authenticated().await);
544 ///
545 /// client.disconnect().await?;
546 /// assert!(!client.is_authenticated().await);
547 /// # Ok(())
548 /// # }
549 /// ```
550 pub async fn is_authenticated(&self) -> bool {
551 matches!(*self.state.lock().await, ClientState::Authenticated { .. })
552 }
553
554 /// Disconnect from the proxy server and clean up resources.
555 ///
556 /// Aborts background tasks, closes the WebSocket connection, and resets state.
557 /// After disconnecting, you can call [`connect()`](ProxyProtocolClient::connect)
558 /// again to reconnect.
559 ///
560 /// This method is automatically called when the client is dropped, but calling it
561 /// explicitly allows you to handle errors.
562 ///
563 /// # Examples
564 ///
565 /// ```no_run
566 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
567 ///
568 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
569 /// # let config = ProxyClientConfig {
570 /// # proxy_url: "ws://localhost:8080".to_string(),
571 /// # identity_keypair: None,
572 /// # };
573 /// let mut client = ProxyProtocolClient::new(config);
574 /// client.connect().await?;
575 ///
576 /// // Do work...
577 ///
578 /// // Clean disconnect
579 /// client.disconnect().await?;
580 /// # Ok(())
581 /// # }
582 /// ```
583 pub async fn disconnect(&mut self) -> Result<(), ProxyError> {
584 // Abort tasks
585 if let Some(handle) = self.read_task_handle.take() {
586 handle.abort();
587 }
588 if let Some(handle) = self.write_task_handle.take() {
589 handle.abort();
590 }
591
592 // Clear state
593 *self.state.lock().await = ClientState::Disconnected;
594
595 // Close channels
596 self.outgoing_tx = None;
597
598 Ok(())
599 }
600
601 /// Write task: sends messages from channel to WebSocket, with keep-alive pings
602 async fn write_task(
603 mut ws_sink: WsSink,
604 mut outgoing_rx: mpsc::UnboundedReceiver<Message>,
605 last_pong: Arc<Mutex<Instant>>,
606 ) {
607 let mut ping_interval = tokio::time::interval(PING_INTERVAL);
608 ping_interval.tick().await; // consume the immediate first tick
609
610 loop {
611 tokio::select! {
612 msg = outgoing_rx.recv() => {
613 match msg {
614 Some(msg) => {
615 if ws_sink.send(msg).await.is_err() {
616 break;
617 }
618 }
619 None => break,
620 }
621 }
622 _ = ping_interval.tick() => {
623 // Check if we received a pong recently
624 let elapsed = last_pong.lock().await.elapsed();
625 if elapsed > PONG_TIMEOUT {
626 tracing::warn!("No pong received in {:?}, closing connection", elapsed);
627 break;
628 }
629 if ws_sink.send(Message::Ping(vec![])).await.is_err() {
630 break;
631 }
632 }
633 }
634 }
635 }
636
637 /// Read task: handles authentication and routes messages
638 async fn read_task(
639 mut ws_source: WsSource,
640 outgoing_tx: mpsc::UnboundedSender<Message>,
641 incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
642 identity: Arc<IdentityKeyPair>,
643 state: Arc<Mutex<ClientState>>,
644 auth_tx: mpsc::UnboundedSender<Result<(), ProxyError>>,
645 last_pong: Arc<Mutex<Instant>>,
646 ) {
647 // Handle authentication
648 match Self::handle_authentication(&mut ws_source, &outgoing_tx, &identity).await {
649 Ok(fingerprint) => {
650 *state.lock().await = ClientState::Authenticated { fingerprint };
651 // Notify that authentication succeeded
652 let _ = auth_tx.send(Ok(()));
653 }
654 Err(e) => {
655 tracing::error!("Authentication failed: {}", e);
656 *state.lock().await = ClientState::Disconnected;
657 // Notify that authentication failed
658 let _ = auth_tx.send(Err(e));
659 return;
660 }
661 }
662
663 // Enter message loop
664 if let Err(e) = Self::message_loop(ws_source, incoming_tx, last_pong).await {
665 tracing::error!("Message loop error: {}", e);
666 }
667
668 *state.lock().await = ClientState::Disconnected;
669 }
670
671 /// Handle authentication challenge-response
672 async fn handle_authentication(
673 ws_source: &mut WsSource,
674 outgoing_tx: &mpsc::UnboundedSender<Message>,
675 identity: &Arc<IdentityKeyPair>,
676 ) -> Result<IdentityFingerprint, ProxyError> {
677 // Receive AuthChallenge
678 let challenge_msg = ws_source
679 .next()
680 .await
681 .ok_or(ProxyError::ConnectionClosed)?
682 .map_err(ws_err)?;
683
684 let challenge = match challenge_msg {
685 Message::Text(text) => match serde_json::from_str::<Messages>(&text)? {
686 Messages::AuthChallenge(c) => c,
687 _ => return Err(ProxyError::InvalidMessage("Expected AuthChallenge".into())),
688 },
689 _ => return Err(ProxyError::InvalidMessage("Expected text message".into())),
690 };
691
692 // Sign challenge
693 let response = challenge.sign(identity);
694 let auth_response = Messages::AuthResponse(identity.identity(), response);
695 let auth_json = serde_json::to_string(&auth_response)?;
696
697 // Send auth response
698 outgoing_tx
699 .send(Message::Text(auth_json))
700 .map_err(|_| ProxyError::ChannelSendFailed)?;
701
702 // Authentication complete - server doesn't send confirmation
703 Ok(identity.identity().fingerprint())
704 }
705
706 /// Message loop: routes incoming messages to channel
707 async fn message_loop(
708 mut ws_source: WsSource,
709 incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
710 last_pong: Arc<Mutex<Instant>>,
711 ) -> Result<(), ProxyError> {
712 while let Some(msg_result) = ws_source.next().await {
713 let msg = msg_result.map_err(ws_err)?;
714
715 match msg {
716 Message::Text(text) => {
717 let parsed: Messages = serde_json::from_str(&text)?;
718 match parsed {
719 Messages::Send {
720 source,
721 destination,
722 payload,
723 } => {
724 // Server always includes source when forwarding messages
725 if let Some(source) = source {
726 incoming_tx
727 .send(IncomingMessage::Send {
728 source,
729 destination,
730 payload,
731 })
732 .ok();
733 } else {
734 tracing::warn!("Received Send message without source");
735 }
736 }
737 Messages::RendevouzInfo(code) => {
738 incoming_tx.send(IncomingMessage::RendevouzInfo(code)).ok();
739 }
740 Messages::IdentityInfo {
741 fingerprint,
742 identity,
743 } => {
744 incoming_tx
745 .send(IncomingMessage::IdentityInfo {
746 fingerprint,
747 identity,
748 })
749 .ok();
750 }
751 Messages::GetIdentity(_) => {
752 tracing::warn!("Received GetIdentity (client should not receive this)");
753 }
754 _ => tracing::warn!("Unexpected message type: {:?}", parsed),
755 }
756 }
757 Message::Pong(_) => {
758 *last_pong.lock().await = Instant::now();
759 }
760 Message::Close(_) => break,
761 _ => {}
762 }
763 }
764
765 Ok(())
766 }
767}