ap_proxy_client/protocol_client.rs
1use ap_proxy_protocol::{
2 IdentityFingerprint, IdentityKeyPair, Messages, ProxyError, RendezvousCode,
3};
4use futures_util::{SinkExt, StreamExt};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{Mutex, mpsc};
8use tokio::task::JoinHandle;
9use tokio::time::Instant;
10use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::Message};
11
12/// Interval between client-side WebSocket pings.
13const PING_INTERVAL: Duration = Duration::from_secs(30);
14/// Maximum time without a pong before the connection is considered dead.
15const PONG_TIMEOUT: Duration = Duration::from_secs(60);
16
17use super::config::{ClientState, IncomingMessage, ProxyClientConfig};
18
19/// Convert tungstenite errors into ProxyError (replaces the From impl that
20/// was removed from ap-proxy-protocol to keep it free of tungstenite deps).
21fn ws_err(e: tokio_tungstenite::tungstenite::Error) -> ProxyError {
22 ProxyError::WebSocket(e.to_string())
23}
24
25type WsStream = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
26type WsSink = futures_util::stream::SplitSink<WsStream, Message>;
27type WsSource = futures_util::stream::SplitStream<WsStream>;
28
29/// Client for connecting to and communicating through a ap-proxy server.
30///
31/// This is the main client API for connecting to a proxy server, authenticating,
32/// discovering peers via rendezvous codes, and sending messages.
33///
34/// # Lifecycle
35///
36/// 1. Create client with [`new()`](ProxyProtocolClient::new)
37/// 2. Connect and authenticate with [`connect()`](ProxyProtocolClient::connect)
38/// 3. Perform operations (send messages, request rendezvous codes, etc.)
39/// 4. Disconnect with [`disconnect()`](ProxyProtocolClient::disconnect)
40///
41/// # Examples
42///
43/// Basic usage:
44///
45/// ```no_run
46/// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage, IdentityKeyPair};
47///
48/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
49/// // Create and connect
50/// let config = ProxyClientConfig {
51/// proxy_url: "ws://localhost:8080".to_string(),
52/// };
53/// let mut client = ProxyProtocolClient::new(config);
54/// let mut incoming = client.connect(IdentityKeyPair::generate()).await?;
55///
56/// // Handle messages
57/// tokio::spawn(async move {
58/// while let Some(msg) = incoming.recv().await {
59/// match msg {
60/// IncomingMessage::Send { source, payload, .. } => {
61/// println!("Got message from {:?}", source);
62/// }
63/// _ => {}
64/// }
65/// }
66/// });
67///
68/// // Send a message
69/// // client.send_to(target_fingerprint, b"Hello".to_vec()).await?;
70/// # Ok(())
71/// # }
72/// ```
73pub struct ProxyProtocolClient {
74 // Configuration
75 config: ProxyClientConfig,
76 identity: Option<Arc<IdentityKeyPair>>,
77
78 // Connection state
79 state: Arc<Mutex<ClientState>>,
80
81 // WebSocket components (None when disconnected)
82 outgoing_tx: Option<mpsc::UnboundedSender<Message>>,
83
84 // Task handles for cleanup
85 read_task_handle: Option<JoinHandle<()>>,
86 write_task_handle: Option<JoinHandle<()>>,
87}
88
89impl ProxyProtocolClient {
90 /// Create a new proxy client with the given configuration.
91 ///
92 /// This does not establish a connection - call [`connect()`](ProxyProtocolClient::connect)
93 /// to connect and authenticate with an identity.
94 ///
95 /// # Examples
96 ///
97 /// ```
98 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient};
99 ///
100 /// let config = ProxyClientConfig {
101 /// proxy_url: "ws://localhost:8080".to_string(),
102 /// };
103 /// let client = ProxyProtocolClient::new(config);
104 /// ```
105 pub fn new(config: ProxyClientConfig) -> Self {
106 Self {
107 config,
108 identity: None,
109 state: Arc::new(Mutex::new(ClientState::Disconnected)),
110 outgoing_tx: None,
111 read_task_handle: None,
112 write_task_handle: None,
113 }
114 }
115
116 /// Create a new proxy client from just a URL.
117 ///
118 /// Convenience constructor equivalent to `new(ProxyClientConfig { proxy_url })`.
119 pub fn from_url(proxy_url: String) -> Self {
120 Self::new(ProxyClientConfig { proxy_url })
121 }
122
123 /// Connect to the proxy server and perform authentication.
124 ///
125 /// Establishes a WebSocket connection, completes the challenge-response authentication
126 /// using the provided identity, and returns a channel for receiving incoming messages.
127 ///
128 /// # Authentication Flow
129 ///
130 /// 1. Connect to WebSocket at the configured URL
131 /// 2. Receive authentication challenge from server
132 /// 3. Sign challenge with the provided identity's private key
133 /// 4. Send signed response to server
134 /// 5. Server verifies signature and accepts connection
135 ///
136 /// # Timeout
137 ///
138 /// Authentication must complete within 5 seconds or this method returns
139 /// [`ProxyError::AuthenticationTimeout`].
140 ///
141 /// # Errors
142 ///
143 /// - [`ProxyError::AlreadyConnected`] if already connected
144 /// - [`ProxyError::WebSocket`] if connection fails
145 /// - [`ProxyError::AuthenticationFailed`] if signature verification fails
146 /// - [`ProxyError::AuthenticationTimeout`] if authentication takes too long
147 ///
148 /// # Examples
149 ///
150 /// ```no_run
151 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IncomingMessage, IdentityKeyPair};
152 ///
153 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
154 /// let config = ProxyClientConfig {
155 /// proxy_url: "ws://localhost:8080".to_string(),
156 /// };
157 /// let mut client = ProxyProtocolClient::new(config);
158 ///
159 /// // Connect and get incoming message channel
160 /// let mut incoming = client.connect(IdentityKeyPair::generate()).await?;
161 ///
162 /// // Handle messages
163 /// while let Some(msg) = incoming.recv().await {
164 /// println!("Received: {:?}", msg);
165 /// }
166 /// # Ok(())
167 /// # }
168 /// ```
169 pub async fn connect(
170 &mut self,
171 identity: IdentityKeyPair,
172 ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ProxyError> {
173 // Check not already connected
174 {
175 let state = self.state.lock().await;
176 if !matches!(*state, ClientState::Disconnected) {
177 return Err(ProxyError::AlreadyConnected);
178 }
179 }
180
181 // Store the identity
182 let identity = Arc::new(identity);
183 self.identity = Some(Arc::clone(&identity));
184
185 // Connect WebSocket
186 let (ws_stream, _) = connect_async(&self.config.proxy_url)
187 .await
188 .map_err(ws_err)?;
189
190 // Split into read/write
191 let (ws_sink, ws_source) = ws_stream.split();
192
193 // Update state to Connected
194 *self.state.lock().await = ClientState::Connected;
195
196 // Create channels
197 let (outgoing_tx, outgoing_rx) = mpsc::unbounded_channel::<Message>();
198 let (incoming_tx, incoming_rx) = mpsc::unbounded_channel::<IncomingMessage>();
199 let (auth_tx, mut auth_rx) = mpsc::unbounded_channel::<Result<(), ProxyError>>();
200
201 // Shared pong tracker for keep-alive
202 let last_pong = Arc::new(Mutex::new(Instant::now()));
203
204 // Spawn write task
205 let write_handle = tokio::spawn(Self::write_task(
206 ws_sink,
207 outgoing_rx,
208 Arc::clone(&last_pong),
209 ));
210
211 // Spawn read task (handles auth + message routing)
212 let read_handle = tokio::spawn(Self::read_task(
213 ws_source,
214 outgoing_tx.clone(),
215 incoming_tx,
216 identity,
217 self.state.clone(),
218 auth_tx,
219 last_pong,
220 ));
221
222 // Wait for authentication to complete
223 match tokio::time::timeout(tokio::time::Duration::from_secs(5), auth_rx.recv()).await {
224 Ok(Some(Ok(()))) => {
225 // Authentication succeeded
226 }
227 Ok(Some(Err(e))) => {
228 // Authentication failed
229 self.read_task_handle = Some(read_handle);
230 self.write_task_handle = Some(write_handle);
231 self.disconnect().await?;
232 return Err(e);
233 }
234 Ok(None) | Err(_) => {
235 // Channel closed or timeout
236 self.read_task_handle = Some(read_handle);
237 self.write_task_handle = Some(write_handle);
238 self.disconnect().await?;
239 return Err(ProxyError::AuthenticationTimeout);
240 }
241 }
242
243 // Store handles and tx
244 self.outgoing_tx = Some(outgoing_tx);
245 self.read_task_handle = Some(read_handle);
246 self.write_task_handle = Some(write_handle);
247
248 Ok(incoming_rx)
249 }
250
251 /// Send a message to another authenticated client.
252 ///
253 /// The message is routed through the proxy server to the destination client.
254 /// The proxy validates the source identity but cannot inspect the payload.
255 ///
256 /// # Authentication Required
257 ///
258 /// This method requires an active authenticated connection. Call
259 /// [`connect()`](ProxyProtocolClient::connect) first.
260 ///
261 /// # Payload Encryption
262 ///
263 /// The proxy does not encrypt message payloads. Clients should implement
264 /// end-to-end encryption (e.g., using the Noise protocol) before calling this method.
265 ///
266 /// # Errors
267 ///
268 /// - [`ProxyError::NotConnected`] if not connected or not authenticated
269 /// - [`ProxyError::DestinationNotFound`] if the destination client is not connected
270 /// - [`ProxyError::Serialization`] if message encoding fails
271 ///
272 /// # Examples
273 ///
274 /// ```no_run
275 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
276 ///
277 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
278 /// # let config = ProxyClientConfig {
279 /// # proxy_url: "ws://localhost:8080".to_string(),
280 /// # };
281 /// let mut client = ProxyProtocolClient::new(config);
282 /// client.connect(IdentityKeyPair::generate()).await?;
283 ///
284 /// // Get destination fingerprint from rendezvous lookup
285 /// // let destination = ...; // from IncomingMessage::IdentityInfo
286 ///
287 /// // Send encrypted message
288 /// // let payload = encrypt_message(b"Hello!")?;
289 /// // client.send_to(destination, payload).await?;
290 /// # Ok(())
291 /// # }
292 /// ```
293 pub async fn send_to(
294 &self,
295 destination: IdentityFingerprint,
296 payload: Vec<u8>,
297 ) -> Result<(), ProxyError> {
298 // Check authenticated
299 {
300 let state = self.state.lock().await;
301 if !matches!(*state, ClientState::Authenticated { .. }) {
302 return Err(ProxyError::NotConnected);
303 }
304 }
305
306 // Create Send message without source (server will add it)
307 let msg = Messages::Send {
308 source: None,
309 destination,
310 payload,
311 };
312
313 let json = serde_json::to_string(&msg)?;
314
315 // Send via outgoing_tx channel
316 if let Some(tx) = &self.outgoing_tx {
317 tx.send(Message::Text(json))
318 .map_err(|_| ProxyError::ChannelSendFailed)?;
319 Ok(())
320 } else {
321 Err(ProxyError::NotConnected)
322 }
323 }
324
325 /// Request a rendezvous code from the server.
326 ///
327 /// The server will generate a temporary code (format: "ABC-DEF-GHI") that maps to your
328 /// identity. The code will be delivered via [`IncomingMessage::RendezvousInfo`] on the
329 /// channel returned by [`connect()`](ProxyProtocolClient::connect).
330 ///
331 /// # Rendezvous Code Properties
332 ///
333 /// - Format: 9 alphanumeric characters (e.g., "ABC-DEF-GHI")
334 /// - Lifetime: 5 minutes
335 /// - Single-use: deleted after lookup
336 /// - Enables peer discovery without sharing long-lived identifiers
337 ///
338 /// # Usage Pattern
339 ///
340 /// 1. Call this method to request a code
341 /// 2. Receive the code via [`IncomingMessage::RendezvousInfo`]
342 /// 3. Share the code with a peer (e.g., display as QR code)
343 /// 4. Peer uses [`request_identity()`](ProxyProtocolClient::request_identity) to look up your identity
344 ///
345 /// # Authentication Required
346 ///
347 /// This method requires an active authenticated connection.
348 ///
349 /// # Errors
350 ///
351 /// - [`ProxyError::NotConnected`] if not connected or not authenticated
352 ///
353 /// # Examples
354 ///
355 /// ```no_run
356 /// use ap_proxy_client::{ProxyProtocolClient, IncomingMessage, IdentityKeyPair};
357 ///
358 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
359 /// let mut client = ProxyProtocolClient::from_url("ws://localhost:8080".to_string());
360 /// let mut incoming = client.connect(IdentityKeyPair::generate()).await?;
361 ///
362 /// // Request a code
363 /// client.request_rendezvous().await?;
364 ///
365 /// // Wait for response
366 /// if let Some(IncomingMessage::RendezvousInfo(code)) = incoming.recv().await {
367 /// println!("Share this code: {}", code.as_str());
368 /// // Display as QR code, send via messaging, etc.
369 /// }
370 /// # Ok(())
371 /// # }
372 /// ```
373 pub async fn request_rendezvous(&self) -> Result<(), ProxyError> {
374 // Check authenticated
375 {
376 let state = self.state.lock().await;
377 if !matches!(*state, ClientState::Authenticated { .. }) {
378 return Err(ProxyError::NotConnected);
379 }
380 }
381
382 // Send GetRendezvous message
383 let msg = Messages::GetRendezvous;
384 let json = serde_json::to_string(&msg)?;
385
386 // Send via outgoing_tx channel
387 if let Some(tx) = &self.outgoing_tx {
388 tx.send(Message::Text(json))
389 .map_err(|_| ProxyError::ChannelSendFailed)?;
390 Ok(())
391 } else {
392 Err(ProxyError::NotConnected)
393 }
394 }
395
396 /// Look up a peer's identity using a rendezvous code.
397 ///
398 /// Queries the server for the identity associated with a rendezvous code.
399 /// If the code is valid and hasn't expired, the server responds with
400 /// [`IncomingMessage::IdentityInfo`] containing the peer's identity and fingerprint.
401 ///
402 /// # Code Consumption
403 ///
404 /// Rendezvous codes are single-use. After successful lookup, the server deletes
405 /// the code and it cannot be used again.
406 ///
407 /// # Authentication Required
408 ///
409 /// This method requires an active authenticated connection.
410 ///
411 /// # Errors
412 ///
413 /// - [`ProxyError::NotConnected`] if not connected or not authenticated
414 ///
415 /// The server may not respond if the code is invalid, expired, or already used.
416 /// Implement a timeout when waiting for the response.
417 ///
418 /// # Examples
419 ///
420 /// ```no_run
421 /// use ap_proxy_client::{ProxyProtocolClient, IncomingMessage, IdentityKeyPair, RendezvousCode};
422 ///
423 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
424 /// let mut client = ProxyProtocolClient::from_url("ws://localhost:8080".to_string());
425 /// let mut incoming = client.connect(IdentityKeyPair::generate()).await?;
426 ///
427 /// // Get code from user (e.g., QR scan, text input)
428 /// let code = RendezvousCode::from_string("ABC-DEF-GHI".to_string());
429 ///
430 /// // Look up the identity
431 /// client.request_identity(code).await?;
432 ///
433 /// // Wait for response with timeout
434 /// match tokio::time::timeout(
435 /// tokio::time::Duration::from_secs(5),
436 /// incoming.recv()
437 /// ).await {
438 /// Ok(Some(IncomingMessage::IdentityInfo { fingerprint, identity })) => {
439 /// println!("Found peer: {:?}", fingerprint);
440 /// // Now you can send messages to this peer
441 /// // client.send_to(fingerprint, payload).await?;
442 /// }
443 /// _ => {
444 /// println!("Code not found or expired");
445 /// }
446 /// }
447 /// # Ok(())
448 /// # }
449 /// ```
450 pub async fn request_identity(
451 &self,
452 rendezvous_code: RendezvousCode,
453 ) -> Result<(), ProxyError> {
454 // Check authenticated
455 {
456 let state = self.state.lock().await;
457 if !matches!(*state, ClientState::Authenticated { .. }) {
458 return Err(ProxyError::NotConnected);
459 }
460 }
461
462 // Send GetIdentity message
463 let msg = Messages::GetIdentity(rendezvous_code);
464 let json = serde_json::to_string(&msg)?;
465
466 // Send via outgoing_tx channel
467 if let Some(tx) = &self.outgoing_tx {
468 tx.send(Message::Text(json))
469 .map_err(|_| ProxyError::ChannelSendFailed)?;
470 Ok(())
471 } else {
472 Err(ProxyError::NotConnected)
473 }
474 }
475
476 /// Get this client's identity fingerprint.
477 ///
478 /// Returns the SHA256 fingerprint of the client's public key. This is the
479 /// identifier that other clients use to send messages to this client.
480 ///
481 /// The fingerprint is available immediately after creating the client, before
482 /// connecting.
483 ///
484 /// # Examples
485 ///
486 /// ```no_run
487 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
488 ///
489 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
490 /// let config = ProxyClientConfig {
491 /// proxy_url: "ws://localhost:8080".to_string(),
492 /// };
493 /// let mut client = ProxyProtocolClient::new(config);
494 /// client.connect(IdentityKeyPair::generate()).await?;
495 /// println!("My fingerprint: {:?}", client.fingerprint());
496 /// # Ok(())
497 /// # }
498 /// ```
499 pub fn fingerprint(&self) -> IdentityFingerprint {
500 self.identity
501 .as_ref()
502 .expect("fingerprint() requires a prior connect()")
503 .identity()
504 .fingerprint()
505 }
506
507 /// Check if the client is currently authenticated.
508 ///
509 /// Returns `true` if the client has completed authentication and can send/receive
510 /// messages. Returns `false` if disconnected or still connecting.
511 ///
512 /// # Examples
513 ///
514 /// ```no_run
515 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
516 ///
517 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
518 /// let config = ProxyClientConfig {
519 /// proxy_url: "ws://localhost:8080".to_string(),
520 /// };
521 /// let mut client = ProxyProtocolClient::new(config);
522 ///
523 /// assert!(!client.is_authenticated().await);
524 ///
525 /// client.connect(IdentityKeyPair::generate()).await?;
526 /// assert!(client.is_authenticated().await);
527 ///
528 /// client.disconnect().await?;
529 /// assert!(!client.is_authenticated().await);
530 /// # Ok(())
531 /// # }
532 /// ```
533 pub async fn is_authenticated(&self) -> bool {
534 matches!(*self.state.lock().await, ClientState::Authenticated { .. })
535 }
536
537 /// Disconnect from the proxy server and clean up resources.
538 ///
539 /// Aborts background tasks, closes the WebSocket connection, and resets state.
540 /// After disconnecting, you can call [`connect()`](ProxyProtocolClient::connect)
541 /// again to reconnect.
542 ///
543 /// This method is automatically called when the client is dropped, but calling it
544 /// explicitly allows you to handle errors.
545 ///
546 /// # Examples
547 ///
548 /// ```no_run
549 /// use ap_proxy_client::{ProxyClientConfig, ProxyProtocolClient, IdentityKeyPair};
550 ///
551 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
552 /// # let config = ProxyClientConfig {
553 /// # proxy_url: "ws://localhost:8080".to_string(),
554 /// # };
555 /// let mut client = ProxyProtocolClient::new(config);
556 /// client.connect(IdentityKeyPair::generate()).await?;
557 ///
558 /// // Do work...
559 ///
560 /// // Clean disconnect
561 /// client.disconnect().await?;
562 /// # Ok(())
563 /// # }
564 /// ```
565 pub async fn disconnect(&mut self) -> Result<(), ProxyError> {
566 // Abort tasks
567 if let Some(handle) = self.read_task_handle.take() {
568 handle.abort();
569 }
570 if let Some(handle) = self.write_task_handle.take() {
571 handle.abort();
572 }
573
574 // Clear state
575 *self.state.lock().await = ClientState::Disconnected;
576
577 // Close channels
578 self.outgoing_tx = None;
579
580 Ok(())
581 }
582
583 /// Write task: sends messages from channel to WebSocket, with keep-alive pings
584 async fn write_task(
585 mut ws_sink: WsSink,
586 mut outgoing_rx: mpsc::UnboundedReceiver<Message>,
587 last_pong: Arc<Mutex<Instant>>,
588 ) {
589 let mut ping_interval = tokio::time::interval(PING_INTERVAL);
590 ping_interval.tick().await; // consume the immediate first tick
591
592 loop {
593 tokio::select! {
594 msg = outgoing_rx.recv() => {
595 match msg {
596 Some(msg) => {
597 if ws_sink.send(msg).await.is_err() {
598 break;
599 }
600 }
601 None => break,
602 }
603 }
604 _ = ping_interval.tick() => {
605 // Check if we received a pong recently
606 let elapsed = last_pong.lock().await.elapsed();
607 if elapsed > PONG_TIMEOUT {
608 tracing::warn!("No pong received in {:?}, closing connection", elapsed);
609 break;
610 }
611 if ws_sink.send(Message::Ping(vec![])).await.is_err() {
612 break;
613 }
614 }
615 }
616 }
617 }
618
619 /// Read task: handles authentication and routes messages
620 async fn read_task(
621 mut ws_source: WsSource,
622 outgoing_tx: mpsc::UnboundedSender<Message>,
623 incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
624 identity: Arc<IdentityKeyPair>,
625 state: Arc<Mutex<ClientState>>,
626 auth_tx: mpsc::UnboundedSender<Result<(), ProxyError>>,
627 last_pong: Arc<Mutex<Instant>>,
628 ) {
629 // Handle authentication
630 match Self::handle_authentication(&mut ws_source, &outgoing_tx, &identity).await {
631 Ok(fingerprint) => {
632 *state.lock().await = ClientState::Authenticated { fingerprint };
633 // Notify that authentication succeeded
634 let _ = auth_tx.send(Ok(()));
635 }
636 Err(e) => {
637 tracing::error!("Authentication failed: {}", e);
638 *state.lock().await = ClientState::Disconnected;
639 // Notify that authentication failed
640 let _ = auth_tx.send(Err(e));
641 return;
642 }
643 }
644
645 // Enter message loop
646 if let Err(e) = Self::message_loop(ws_source, incoming_tx, last_pong).await {
647 tracing::error!("Message loop error: {}", e);
648 }
649
650 *state.lock().await = ClientState::Disconnected;
651 }
652
653 /// Handle authentication challenge-response
654 async fn handle_authentication(
655 ws_source: &mut WsSource,
656 outgoing_tx: &mpsc::UnboundedSender<Message>,
657 identity: &Arc<IdentityKeyPair>,
658 ) -> Result<IdentityFingerprint, ProxyError> {
659 // Receive AuthChallenge
660 let challenge_msg = ws_source
661 .next()
662 .await
663 .ok_or(ProxyError::ConnectionClosed)?
664 .map_err(ws_err)?;
665
666 let challenge = match challenge_msg {
667 Message::Text(text) => match serde_json::from_str::<Messages>(&text)? {
668 Messages::AuthChallenge(c) => c,
669 _ => return Err(ProxyError::InvalidMessage("Expected AuthChallenge".into())),
670 },
671 _ => return Err(ProxyError::InvalidMessage("Expected text message".into())),
672 };
673
674 // Sign challenge
675 let response = challenge.sign(identity);
676 let auth_response = Messages::AuthResponse(identity.identity(), response);
677 let auth_json = serde_json::to_string(&auth_response)?;
678
679 // Send auth response
680 outgoing_tx
681 .send(Message::Text(auth_json))
682 .map_err(|_| ProxyError::ChannelSendFailed)?;
683
684 // Authentication complete - server doesn't send confirmation
685 Ok(identity.identity().fingerprint())
686 }
687
688 /// Message loop: routes incoming messages to channel
689 async fn message_loop(
690 mut ws_source: WsSource,
691 incoming_tx: mpsc::UnboundedSender<IncomingMessage>,
692 last_pong: Arc<Mutex<Instant>>,
693 ) -> Result<(), ProxyError> {
694 while let Some(msg_result) = ws_source.next().await {
695 let msg = msg_result.map_err(ws_err)?;
696
697 match msg {
698 Message::Text(text) => {
699 let parsed: Messages = serde_json::from_str(&text)?;
700 match parsed {
701 Messages::Send {
702 source,
703 destination,
704 payload,
705 } => {
706 // Server always includes source when forwarding messages
707 if let Some(source) = source {
708 incoming_tx
709 .send(IncomingMessage::Send {
710 source,
711 destination,
712 payload,
713 })
714 .ok();
715 } else {
716 tracing::warn!("Received Send message without source");
717 }
718 }
719 Messages::RendezvousInfo(code) => {
720 incoming_tx.send(IncomingMessage::RendezvousInfo(code)).ok();
721 }
722 Messages::IdentityInfo {
723 fingerprint,
724 identity,
725 } => {
726 incoming_tx
727 .send(IncomingMessage::IdentityInfo {
728 fingerprint,
729 identity,
730 })
731 .ok();
732 }
733 Messages::GetIdentity(_) => {
734 tracing::warn!("Received GetIdentity (client should not receive this)");
735 }
736 _ => tracing::warn!("Unexpected message type: {:?}", parsed),
737 }
738 }
739 Message::Pong(_) => {
740 *last_pong.lock().await = Instant::now();
741 }
742 Message::Close(_) => break,
743 _ => {}
744 }
745 }
746
747 Ok(())
748 }
749}