Skip to main content

ap_client/clients/
remote_client.rs

1use std::time::Duration;
2
3use ap_noise::{InitiatorHandshake, MultiDeviceTransport, Psk};
4use ap_proxy_client::IncomingMessage;
5use ap_proxy_protocol::{IdentityFingerprint, RendezvousCode};
6use base64::{Engine, engine::general_purpose::STANDARD};
7use rand::RngCore;
8
9use crate::compat::{now_millis, timeout};
10use crate::proxy::ProxyClient;
11use tokio::sync::{mpsc, oneshot};
12use tracing::{debug, warn};
13
14use super::notify;
15use crate::traits::{IdentityProvider, SessionStore};
16use crate::{
17    error::ClientError,
18    types::{
19        CredentialData, CredentialQuery, CredentialRequestPayload, CredentialResponsePayload,
20        ProtocolMessage,
21    },
22};
23
24const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
25
26// =============================================================================
27// Public types: Notifications (fire-and-forget) and Requests (with reply)
28// =============================================================================
29
30/// Fire-and-forget status updates emitted by the remote client.
31#[derive(Debug, Clone)]
32pub enum RemoteClientNotification {
33    /// Connecting to the proxy server
34    Connecting,
35    /// Successfully connected to the proxy
36    Connected {
37        /// The device's identity fingerprint (hex-encoded)
38        fingerprint: IdentityFingerprint,
39    },
40    /// Reconnecting to an existing session
41    ReconnectingToSession {
42        /// The fingerprint being reconnected to
43        fingerprint: IdentityFingerprint,
44    },
45    /// Rendezvous code resolution starting
46    RendezvousResolving {
47        /// The rendezvous code being resolved
48        code: String,
49    },
50    /// Rendezvous code resolved to fingerprint
51    RendezvousResolved {
52        /// The resolved identity fingerprint
53        fingerprint: IdentityFingerprint,
54    },
55    /// Using PSK mode for connection
56    PskMode {
57        /// The fingerprint being connected to
58        fingerprint: IdentityFingerprint,
59    },
60    /// Noise handshake starting
61    HandshakeStart,
62    /// Noise handshake progress
63    HandshakeProgress {
64        /// Progress message
65        message: String,
66    },
67    /// Noise handshake complete
68    HandshakeComplete,
69    /// Handshake fingerprint (informational — for PSK or non-verified connections)
70    HandshakeFingerprint {
71        /// The 6-character hex fingerprint
72        fingerprint: String,
73    },
74    /// User verified the fingerprint
75    FingerprintVerified,
76    /// User rejected the fingerprint
77    FingerprintRejected {
78        /// Reason for rejection
79        reason: String,
80    },
81    /// Client is ready for credential requests
82    Ready {
83        /// Whether credentials can be requested
84        can_request_credentials: bool,
85    },
86    /// Credential request was sent
87    CredentialRequestSent {
88        /// The query used for the request
89        query: CredentialQuery,
90    },
91    /// Credential was received
92    CredentialReceived {
93        /// The credential data
94        credential: CredentialData,
95    },
96    /// An error occurred
97    Error {
98        /// Error message
99        message: String,
100        /// Context where error occurred
101        context: Option<String>,
102    },
103    /// Client was disconnected
104    Disconnected {
105        /// Reason for disconnection
106        reason: Option<String>,
107    },
108}
109
110/// Reply for fingerprint verification requests.
111#[derive(Debug)]
112pub struct RemoteClientFingerprintReply {
113    /// Whether user approved the fingerprint
114    pub approved: bool,
115}
116
117/// Requests that require a caller response, carrying a oneshot reply channel.
118#[derive(Debug)]
119pub enum RemoteClientRequest {
120    /// Handshake fingerprint requires verification.
121    VerifyFingerprint {
122        /// The 6-character hex fingerprint for visual verification
123        fingerprint: String,
124        /// Channel to send the verification reply
125        reply: oneshot::Sender<RemoteClientFingerprintReply>,
126    },
127}
128
129// =============================================================================
130// Command channel for RemoteClient handle → event loop communication
131// =============================================================================
132
133/// Type alias matching `SessionStore::list_sessions()` return type.
134type SessionList = Vec<(IdentityFingerprint, Option<String>, u64, u64)>;
135
136/// Commands sent from a `RemoteClient` handle to the running event loop.
137enum RemoteClientCommand {
138    PairWithHandshake {
139        rendezvous_code: String,
140        verify_fingerprint: bool,
141        reply: oneshot::Sender<Result<IdentityFingerprint, ClientError>>,
142    },
143    PairWithPsk {
144        psk: Psk,
145        remote_fingerprint: IdentityFingerprint,
146        reply: oneshot::Sender<Result<(), ClientError>>,
147    },
148    LoadCachedSession {
149        remote_fingerprint: IdentityFingerprint,
150        reply: oneshot::Sender<Result<(), ClientError>>,
151    },
152    RequestCredential {
153        query: CredentialQuery,
154        reply: oneshot::Sender<Result<CredentialData, ClientError>>,
155    },
156    ListSessions {
157        reply: oneshot::Sender<SessionList>,
158    },
159    HasSession {
160        fingerprint: IdentityFingerprint,
161        reply: oneshot::Sender<bool>,
162    },
163}
164
165// =============================================================================
166// Handle — cloneable, Send, all methods take &self
167// =============================================================================
168
169/// A cloneable handle for controlling the remote client.
170///
171/// Obtained from [`RemoteClient::connect()`], which authenticates with the proxy,
172/// spawns the event loop internally, and returns this handle. All methods
173/// communicate with the event loop through an internal command channel.
174///
175/// `Clone` and `Send` — share freely across tasks and threads.
176/// Dropping all handles shuts down the event loop and disconnects from the proxy.
177/// Handle returned by [`RemoteClient::connect()`] containing the client and its
178/// notification/request channels.
179pub struct RemoteClientHandle {
180    pub client: RemoteClient,
181    pub notifications: mpsc::Receiver<RemoteClientNotification>,
182    pub requests: mpsc::Receiver<RemoteClientRequest>,
183}
184
185#[derive(Clone)]
186pub struct RemoteClient {
187    command_tx: mpsc::Sender<RemoteClientCommand>,
188}
189
190impl RemoteClient {
191    /// Connect to the proxy server, spawn the event loop, and return a handle.
192    ///
193    /// This is the single entry point. After `connect()` returns, the client is
194    /// authenticated with the proxy and ready for pairing. Use one of the pairing
195    /// methods to establish a secure channel:
196    /// - [`pair_with_handshake()`](Self::pair_with_handshake) for rendezvous-based pairing
197    /// - [`pair_with_psk()`](Self::pair_with_psk) for PSK-based pairing
198    /// - [`load_cached_session()`](Self::load_cached_session) for reconnecting with a cached session
199    pub async fn connect(
200        identity_provider: Box<dyn IdentityProvider>,
201        session_store: Box<dyn SessionStore>,
202        mut proxy_client: Box<dyn ProxyClient>,
203    ) -> Result<RemoteClientHandle, ClientError> {
204        let own_fingerprint = identity_provider.fingerprint().await;
205
206        debug!("Connecting to proxy with identity {:?}", own_fingerprint);
207
208        let (notification_tx, notification_rx) = mpsc::channel(32);
209        let (request_tx, request_rx) = mpsc::channel(32);
210
211        notify!(notification_tx, RemoteClientNotification::Connecting);
212
213        let incoming_rx = proxy_client.connect().await?;
214
215        notify!(
216            notification_tx,
217            RemoteClientNotification::Connected {
218                fingerprint: own_fingerprint,
219            }
220        );
221
222        debug!("Connected to proxy successfully");
223
224        // Create command channel
225        let (command_tx, command_rx) = mpsc::channel(32);
226
227        // Build inner state
228        let inner = RemoteClientInner {
229            session_store,
230            proxy_client,
231            transport: None,
232            remote_fingerprint: None,
233        };
234
235        // Spawn the event loop — use spawn_local on WASM (no Tokio runtime)
236        #[cfg(target_arch = "wasm32")]
237        wasm_bindgen_futures::spawn_local(inner.run_event_loop(
238            incoming_rx,
239            command_rx,
240            notification_tx,
241            request_tx,
242        ));
243        #[cfg(not(target_arch = "wasm32"))]
244        tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
245
246        Ok(RemoteClientHandle {
247            client: Self { command_tx },
248            notifications: notification_rx,
249            requests: request_rx,
250        })
251    }
252
253    /// Pair with a remote device using a rendezvous code.
254    ///
255    /// Resolves the rendezvous code to a fingerprint, performs the Noise handshake,
256    /// and optionally waits for user fingerprint verification. If `verify_fingerprint`
257    /// is true, a [`RemoteClientRequest::VerifyFingerprint`] will be sent on the
258    /// request channel and must be answered before this method returns.
259    pub async fn pair_with_handshake(
260        &self,
261        rendezvous_code: String,
262        verify_fingerprint: bool,
263    ) -> Result<IdentityFingerprint, ClientError> {
264        let (tx, rx) = oneshot::channel();
265        self.command_tx
266            .send(RemoteClientCommand::PairWithHandshake {
267                rendezvous_code,
268                verify_fingerprint,
269                reply: tx,
270            })
271            .await
272            .map_err(|_| ClientError::ChannelClosed)?;
273        rx.await.map_err(|_| ClientError::ChannelClosed)?
274    }
275
276    /// Pair with a remote device using a pre-shared key.
277    ///
278    /// Uses the PSK for authentication, skipping fingerprint verification
279    /// since trust is established through the PSK.
280    pub async fn pair_with_psk(
281        &self,
282        psk: Psk,
283        remote_fingerprint: IdentityFingerprint,
284    ) -> Result<(), ClientError> {
285        let (tx, rx) = oneshot::channel();
286        self.command_tx
287            .send(RemoteClientCommand::PairWithPsk {
288                psk,
289                remote_fingerprint,
290                reply: tx,
291            })
292            .await
293            .map_err(|_| ClientError::ChannelClosed)?;
294        rx.await.map_err(|_| ClientError::ChannelClosed)?
295    }
296
297    /// Reconnect to a remote device using a cached session.
298    ///
299    /// Verifies the session exists in the session store and reconnects
300    /// without requiring fingerprint verification.
301    pub async fn load_cached_session(
302        &self,
303        remote_fingerprint: IdentityFingerprint,
304    ) -> Result<(), ClientError> {
305        let (tx, rx) = oneshot::channel();
306        self.command_tx
307            .send(RemoteClientCommand::LoadCachedSession {
308                remote_fingerprint,
309                reply: tx,
310            })
311            .await
312            .map_err(|_| ClientError::ChannelClosed)?;
313        rx.await.map_err(|_| ClientError::ChannelClosed)?
314    }
315
316    /// Request a credential over the secure channel.
317    pub async fn request_credential(
318        &self,
319        query: &CredentialQuery,
320    ) -> Result<CredentialData, ClientError> {
321        let (tx, rx) = oneshot::channel();
322        self.command_tx
323            .send(RemoteClientCommand::RequestCredential {
324                query: query.clone(),
325                reply: tx,
326            })
327            .await
328            .map_err(|_| ClientError::ChannelClosed)?;
329        rx.await.map_err(|_| ClientError::ChannelClosed)?
330    }
331
332    /// List all cached sessions.
333    pub async fn list_sessions(
334        &self,
335    ) -> Result<Vec<(IdentityFingerprint, Option<String>, u64, u64)>, ClientError> {
336        let (tx, rx) = oneshot::channel();
337        self.command_tx
338            .send(RemoteClientCommand::ListSessions { reply: tx })
339            .await
340            .map_err(|_| ClientError::ChannelClosed)?;
341        rx.await.map_err(|_| ClientError::ChannelClosed)
342    }
343
344    /// Check if a session exists for a fingerprint.
345    pub async fn has_session(&self, fingerprint: IdentityFingerprint) -> Result<bool, ClientError> {
346        let (tx, rx) = oneshot::channel();
347        self.command_tx
348            .send(RemoteClientCommand::HasSession {
349                fingerprint,
350                reply: tx,
351            })
352            .await
353            .map_err(|_| ClientError::ChannelClosed)?;
354        rx.await.map_err(|_| ClientError::ChannelClosed)
355    }
356}
357
358// =============================================================================
359// Internal state — lives inside the spawned event loop task
360// =============================================================================
361
362/// All mutable state for the remote client, owned by the spawned event loop task.
363struct RemoteClientInner {
364    session_store: Box<dyn SessionStore>,
365    proxy_client: Box<dyn ProxyClient>,
366    transport: Option<MultiDeviceTransport>,
367    remote_fingerprint: Option<IdentityFingerprint>,
368}
369
370impl RemoteClientInner {
371    /// Run the main event loop (consumes self).
372    async fn run_event_loop(
373        mut self,
374        mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
375        mut command_rx: mpsc::Receiver<RemoteClientCommand>,
376        notification_tx: mpsc::Sender<RemoteClientNotification>,
377        request_tx: mpsc::Sender<RemoteClientRequest>,
378    ) {
379        loop {
380            tokio::select! {
381                msg = incoming_rx.recv() => {
382                    match msg {
383                        Some(_) => {
384                            // Stray message while idle — ignore
385                            debug!("Received message while idle");
386                        }
387                        None => {
388                            // Proxy disconnected
389                            notify!(notification_tx, RemoteClientNotification::Disconnected {
390                                reason: Some("Proxy connection closed".to_string()),
391                            });
392                            return;
393                        }
394                    }
395                }
396                cmd = command_rx.recv() => {
397                    match cmd {
398                        Some(cmd) => {
399                            self.handle_command(
400                                cmd,
401                                &mut incoming_rx,
402                                &notification_tx,
403                                &request_tx,
404                            ).await;
405                        }
406                        None => {
407                            // All handles dropped — shut down
408                            debug!("All RemoteClient handles dropped, shutting down event loop");
409                            self.proxy_client.disconnect().await.ok();
410                            return;
411                        }
412                    }
413                }
414            }
415        }
416    }
417
418    /// Dispatch a command from the handle.
419    async fn handle_command(
420        &mut self,
421        cmd: RemoteClientCommand,
422        incoming_rx: &mut mpsc::UnboundedReceiver<IncomingMessage>,
423        notification_tx: &mpsc::Sender<RemoteClientNotification>,
424        request_tx: &mpsc::Sender<RemoteClientRequest>,
425    ) {
426        match cmd {
427            RemoteClientCommand::PairWithHandshake {
428                rendezvous_code,
429                verify_fingerprint,
430                reply,
431            } => {
432                let result = self
433                    .do_pair_with_handshake(
434                        rendezvous_code,
435                        verify_fingerprint,
436                        incoming_rx,
437                        notification_tx,
438                        request_tx,
439                    )
440                    .await;
441                let _ = reply.send(result);
442            }
443            RemoteClientCommand::PairWithPsk {
444                psk,
445                remote_fingerprint,
446                reply,
447            } => {
448                let result = self
449                    .do_pair_with_psk(psk, remote_fingerprint, incoming_rx, notification_tx)
450                    .await;
451                let _ = reply.send(result);
452            }
453            RemoteClientCommand::LoadCachedSession {
454                remote_fingerprint,
455                reply,
456            } => {
457                let result = self
458                    .do_load_cached_session(remote_fingerprint, notification_tx)
459                    .await;
460                let _ = reply.send(result);
461            }
462            RemoteClientCommand::RequestCredential { query, reply } => {
463                let result = self
464                    .do_request_credential(query, incoming_rx, notification_tx)
465                    .await;
466                let _ = reply.send(result);
467            }
468            RemoteClientCommand::ListSessions { reply } => {
469                let sessions = self.session_store.list_sessions().await;
470                let _ = reply.send(sessions);
471            }
472            RemoteClientCommand::HasSession { fingerprint, reply } => {
473                let has = self.session_store.has_session(&fingerprint).await;
474                let _ = reply.send(has);
475            }
476        }
477    }
478
479    // ── Pairing: Rendezvous handshake ────────────────────────────────
480
481    async fn do_pair_with_handshake(
482        &mut self,
483        rendezvous_code: String,
484        verify_fingerprint: bool,
485        incoming_rx: &mut mpsc::UnboundedReceiver<IncomingMessage>,
486        notification_tx: &mpsc::Sender<RemoteClientNotification>,
487        request_tx: &mpsc::Sender<RemoteClientRequest>,
488    ) -> Result<IdentityFingerprint, ClientError> {
489        // Resolve rendezvous code to fingerprint
490        notify!(
491            notification_tx,
492            RemoteClientNotification::RendezvousResolving {
493                code: rendezvous_code.clone(),
494            }
495        );
496
497        let remote_fingerprint =
498            Self::resolve_rendezvous(self.proxy_client.as_ref(), incoming_rx, &rendezvous_code)
499                .await?;
500
501        notify!(
502            notification_tx,
503            RemoteClientNotification::RendezvousResolved {
504                fingerprint: remote_fingerprint,
505            }
506        );
507
508        // Perform Noise handshake (no PSK)
509        notify!(notification_tx, RemoteClientNotification::HandshakeStart);
510
511        let (transport, fingerprint_str) = Self::perform_handshake(
512            self.proxy_client.as_ref(),
513            incoming_rx,
514            remote_fingerprint,
515            None,
516        )
517        .await?;
518
519        notify!(notification_tx, RemoteClientNotification::HandshakeComplete);
520
521        // Always emit fingerprint (informational or for verification)
522        notify!(
523            notification_tx,
524            RemoteClientNotification::HandshakeFingerprint {
525                fingerprint: fingerprint_str.clone(),
526            }
527        );
528
529        if verify_fingerprint {
530            // Send verification request via request channel
531            let (fp_tx, fp_rx) = oneshot::channel();
532            if request_tx.capacity() == 0 {
533                warn!("Request channel full, waiting for consumer to drain");
534            }
535            request_tx
536                .send(RemoteClientRequest::VerifyFingerprint {
537                    fingerprint: fingerprint_str,
538                    reply: fp_tx,
539                })
540                .await
541                .map_err(|_| ClientError::ChannelClosed)?;
542
543            // Wait for user verification (60s timeout)
544            match timeout(Duration::from_secs(60), fp_rx).await {
545                Ok(Ok(RemoteClientFingerprintReply { approved: true })) => {
546                    notify!(
547                        notification_tx,
548                        RemoteClientNotification::FingerprintVerified
549                    );
550                }
551                Ok(Ok(RemoteClientFingerprintReply { approved: false })) => {
552                    self.proxy_client.disconnect().await.ok();
553                    notify!(
554                        notification_tx,
555                        RemoteClientNotification::FingerprintRejected {
556                            reason: "User rejected fingerprint verification".to_string(),
557                        }
558                    );
559                    return Err(ClientError::FingerprintRejected);
560                }
561                Ok(Err(_)) => {
562                    return Err(ClientError::ChannelClosed);
563                }
564                Err(_) => {
565                    self.proxy_client.disconnect().await.ok();
566                    return Err(ClientError::Timeout(
567                        "Fingerprint verification timeout".to_string(),
568                    ));
569                }
570            }
571        }
572
573        // Finalize connection
574        self.finalize_pairing(transport, remote_fingerprint, notification_tx)
575            .await?;
576
577        Ok(remote_fingerprint)
578    }
579
580    // ── Pairing: PSK ─────────────────────────────────────────────────
581
582    async fn do_pair_with_psk(
583        &mut self,
584        psk: Psk,
585        remote_fingerprint: IdentityFingerprint,
586        incoming_rx: &mut mpsc::UnboundedReceiver<IncomingMessage>,
587        notification_tx: &mpsc::Sender<RemoteClientNotification>,
588    ) -> Result<(), ClientError> {
589        notify!(
590            notification_tx,
591            RemoteClientNotification::PskMode {
592                fingerprint: remote_fingerprint,
593            }
594        );
595
596        // Perform Noise handshake with PSK
597        notify!(notification_tx, RemoteClientNotification::HandshakeStart);
598
599        let (transport, _fingerprint_str) = Self::perform_handshake(
600            self.proxy_client.as_ref(),
601            incoming_rx,
602            remote_fingerprint,
603            Some(psk),
604        )
605        .await?;
606
607        notify!(notification_tx, RemoteClientNotification::HandshakeComplete);
608
609        // Skip fingerprint verification (trust via PSK)
610        notify!(
611            notification_tx,
612            RemoteClientNotification::FingerprintVerified
613        );
614
615        // Finalize connection
616        self.finalize_pairing(transport, remote_fingerprint, notification_tx)
617            .await?;
618
619        Ok(())
620    }
621
622    // ── Cached session reconnection ──────────────────────────────────
623
624    async fn do_load_cached_session(
625        &mut self,
626        remote_fingerprint: IdentityFingerprint,
627        notification_tx: &mpsc::Sender<RemoteClientNotification>,
628    ) -> Result<(), ClientError> {
629        if !self.session_store.has_session(&remote_fingerprint).await {
630            return Err(ClientError::SessionNotFound);
631        }
632
633        notify!(
634            notification_tx,
635            RemoteClientNotification::ReconnectingToSession {
636                fingerprint: remote_fingerprint,
637            }
638        );
639
640        let transport = self
641            .session_store
642            .load_transport_state(&remote_fingerprint)
643            .await?
644            .ok_or(ClientError::SessionNotFound)?;
645
646        notify!(notification_tx, RemoteClientNotification::HandshakeComplete);
647
648        // Skip fingerprint verification (already trusted)
649        notify!(
650            notification_tx,
651            RemoteClientNotification::FingerprintVerified
652        );
653
654        // Update last_connected_at
655        self.session_store
656            .update_last_connected(&remote_fingerprint)
657            .await?;
658
659        // Save transport state and store locally
660        self.session_store
661            .save_transport_state(&remote_fingerprint, transport.clone())
662            .await?;
663
664        self.transport = Some(transport);
665        self.remote_fingerprint = Some(remote_fingerprint);
666
667        notify!(
668            notification_tx,
669            RemoteClientNotification::Ready {
670                can_request_credentials: true,
671            }
672        );
673
674        debug!("Reconnected to cached session");
675        Ok(())
676    }
677
678    // ── Shared pairing finalization ──────────────────────────────────
679
680    async fn finalize_pairing(
681        &mut self,
682        transport: MultiDeviceTransport,
683        remote_fingerprint: IdentityFingerprint,
684        notification_tx: &mpsc::Sender<RemoteClientNotification>,
685    ) -> Result<(), ClientError> {
686        // Cache session
687        self.session_store.cache_session(remote_fingerprint).await?;
688
689        // Save transport state for session resumption
690        self.session_store
691            .save_transport_state(&remote_fingerprint, transport.clone())
692            .await?;
693
694        // Store transport and remote fingerprint
695        self.transport = Some(transport);
696        self.remote_fingerprint = Some(remote_fingerprint);
697
698        // Emit Ready event
699        notify!(
700            notification_tx,
701            RemoteClientNotification::Ready {
702                can_request_credentials: true,
703            }
704        );
705
706        debug!("Connection established successfully");
707        Ok(())
708    }
709
710    // ── Credential request ───────────────────────────────────────────
711
712    async fn do_request_credential(
713        &mut self,
714        query: CredentialQuery,
715        incoming_rx: &mut mpsc::UnboundedReceiver<IncomingMessage>,
716        notification_tx: &mpsc::Sender<RemoteClientNotification>,
717    ) -> Result<CredentialData, ClientError> {
718        let remote_fingerprint = self.remote_fingerprint.ok_or(ClientError::NotInitialized)?;
719
720        // Sliced string is a UUID and isn't going to contain wide chars
721        #[allow(clippy::string_slice)]
722        let request_id = format!("req-{}-{}", now_millis(), &uuid_v4()[..8]);
723
724        debug!("Requesting credential for query: {:?}", query);
725
726        // Create and encrypt request
727        let request = CredentialRequestPayload {
728            request_type: "credential_request".to_string(),
729            query: query.clone(),
730            timestamp: now_millis(),
731            request_id: request_id.clone(),
732        };
733
734        let request_json = serde_json::to_string(&request)?;
735
736        let encrypted_data = {
737            let transport = self
738                .transport
739                .as_mut()
740                .ok_or(ClientError::SecureChannelNotEstablished)?;
741            let encrypted_packet = transport
742                .encrypt(request_json.as_bytes())
743                .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
744            STANDARD.encode(encrypted_packet.encode())
745        };
746
747        let msg = ProtocolMessage::CredentialRequest {
748            encrypted: encrypted_data,
749        };
750
751        // Send via proxy
752        let msg_json = serde_json::to_string(&msg)?;
753        self.proxy_client
754            .send_to(remote_fingerprint, msg_json.into_bytes())
755            .await?;
756
757        // Emit event
758        notify!(
759            notification_tx,
760            RemoteClientNotification::CredentialRequestSent {
761                query: query.clone(),
762            }
763        );
764
765        // Wait for matching response inline
766        match timeout(
767            DEFAULT_TIMEOUT,
768            self.receive_credential_response(&request_id, incoming_rx, notification_tx),
769        )
770        .await
771        {
772            Ok(result) => result,
773            Err(_) => Err(ClientError::Timeout(format!(
774                "Timeout waiting for credential response for query: {query:?}"
775            ))),
776        }
777    }
778
779    /// Wait for a credential response matching the given request_id.
780    ///
781    /// Stale responses from previous requests (e.g. duplicate multi-device
782    /// responses) are decrypted, logged, and silently discarded.
783    async fn receive_credential_response(
784        &mut self,
785        request_id: &str,
786        incoming_rx: &mut mpsc::UnboundedReceiver<IncomingMessage>,
787        notification_tx: &mpsc::Sender<RemoteClientNotification>,
788    ) -> Result<CredentialData, ClientError> {
789        loop {
790            match incoming_rx.recv().await {
791                Some(IncomingMessage::Send { payload, .. }) => {
792                    if let Ok(text) = String::from_utf8(payload)
793                        && let Ok(ProtocolMessage::CredentialResponse { encrypted }) =
794                            serde_json::from_str::<ProtocolMessage>(&text)
795                    {
796                        match self
797                            .decrypt_credential_response(&encrypted, request_id, notification_tx)
798                            .await
799                        {
800                            Ok(credential) => return Ok(credential),
801                            Err(ClientError::CredentialRequestFailed(ref msg))
802                                if msg.contains("request_id mismatch") =>
803                            {
804                                // Stale response from a previous request — skip it
805                                debug!("Skipping stale credential response: {msg}");
806                                continue;
807                            }
808                            Err(e) => return Err(e),
809                        }
810                    }
811                }
812                Some(_) => {
813                    // Non-Send messages (RendezvousInfo, IdentityInfo) — ignore
814                }
815                None => {
816                    return Err(ClientError::ChannelClosed);
817                }
818            }
819        }
820    }
821
822    /// Decrypt and validate a credential response.
823    async fn decrypt_credential_response(
824        &mut self,
825        encrypted: &str,
826        request_id: &str,
827        notification_tx: &mpsc::Sender<RemoteClientNotification>,
828    ) -> Result<CredentialData, ClientError> {
829        let encrypted_bytes = STANDARD
830            .decode(encrypted)
831            .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
832
833        let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
834            .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
835
836        let transport = self
837            .transport
838            .as_mut()
839            .ok_or(ClientError::SecureChannelNotEstablished)?;
840
841        let decrypted = transport
842            .decrypt(&packet)
843            .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
844
845        let response: CredentialResponsePayload = serde_json::from_slice(&decrypted)?;
846
847        // Verify request_id matches
848        if response.request_id.as_deref() != Some(request_id) {
849            warn!(
850                "Ignoring response with mismatched request_id: {:?}",
851                response.request_id
852            );
853            return Err(ClientError::CredentialRequestFailed(
854                "Response request_id mismatch".to_string(),
855            ));
856        }
857
858        if let Some(error) = response.error {
859            return Err(ClientError::CredentialRequestFailed(error));
860        }
861
862        if let Some(credential) = response.credential {
863            notify!(
864                notification_tx,
865                RemoteClientNotification::CredentialReceived {
866                    credential: credential.clone(),
867                }
868            );
869            Ok(credential)
870        } else {
871            Err(ClientError::CredentialRequestFailed(
872                "Response contains neither credential nor error".to_string(),
873            ))
874        }
875    }
876
877    // ── Handshake helpers (associated functions) ─────────────────────
878
879    /// Resolve rendezvous code to identity fingerprint.
880    async fn resolve_rendezvous(
881        proxy_client: &dyn ProxyClient,
882        incoming_rx: &mut mpsc::UnboundedReceiver<IncomingMessage>,
883        rendezvous_code: &str,
884    ) -> Result<IdentityFingerprint, ClientError> {
885        // Send GetIdentity request
886        proxy_client
887            .request_identity(RendezvousCode::from_string(rendezvous_code.to_string()))
888            .await
889            .map_err(|e| ClientError::RendezvousResolutionFailed(e.to_string()))?;
890
891        // Wait for IdentityInfo response with timeout
892        let timeout_duration = Duration::from_secs(10);
893        match timeout(timeout_duration, async {
894            while let Some(msg) = incoming_rx.recv().await {
895                if let IncomingMessage::IdentityInfo { fingerprint, .. } = msg {
896                    return Some(fingerprint);
897                }
898            }
899            None
900        })
901        .await
902        {
903            Ok(Some(fingerprint)) => Ok(fingerprint),
904            Ok(None) => Err(ClientError::RendezvousResolutionFailed(
905                "Connection closed while waiting for identity response".to_string(),
906            )),
907            Err(_) => Err(ClientError::RendezvousResolutionFailed(
908                "Timeout waiting for identity response. The rendezvous code may be invalid, expired, or the target client may be disconnected.".to_string(),
909            )),
910        }
911    }
912
913    /// Perform Noise handshake as initiator.
914    async fn perform_handshake(
915        proxy_client: &dyn ProxyClient,
916        incoming_rx: &mut mpsc::UnboundedReceiver<IncomingMessage>,
917        remote_fingerprint: IdentityFingerprint,
918        psk: Option<Psk>,
919    ) -> Result<(MultiDeviceTransport, String), ClientError> {
920        // Compute PSK ID before moving the PSK into the handshake
921        let psk_id = psk.as_ref().map(|p| p.id());
922
923        // Create initiator handshake (with or without PSK)
924        let mut handshake = if let Some(psk) = psk {
925            InitiatorHandshake::with_psk(psk)
926        } else {
927            InitiatorHandshake::new()
928        };
929
930        // Generate handshake init
931        let init_packet = handshake.send_start()?;
932
933        // Send HandshakeInit message
934        let msg = ProtocolMessage::HandshakeInit {
935            data: STANDARD.encode(init_packet.encode()?),
936            ciphersuite: format!("{:?}", handshake.ciphersuite()),
937            psk_id,
938        };
939
940        let msg_json = serde_json::to_string(&msg)?;
941        proxy_client
942            .send_to(remote_fingerprint, msg_json.into_bytes())
943            .await?;
944
945        debug!("Sent handshake init");
946
947        // Wait for HandshakeResponse
948        let response_timeout = Duration::from_secs(10);
949        let response: String = timeout(response_timeout, async {
950            loop {
951                if let Some(incoming) = incoming_rx.recv().await {
952                    match incoming {
953                        IncomingMessage::Send { payload, .. } => {
954                            // Try to parse as ProtocolMessage
955                            if let Ok(text) = String::from_utf8(payload)
956                                && let Ok(ProtocolMessage::HandshakeResponse { data, .. }) =
957                                    serde_json::from_str::<ProtocolMessage>(&text)
958                            {
959                                return Ok::<String, ClientError>(data);
960                            }
961                        }
962                        _ => continue,
963                    }
964                }
965            }
966        })
967        .await
968        .map_err(|_| ClientError::Timeout("Waiting for handshake response".to_string()))??;
969
970        // Decode and process response
971        let response_bytes = STANDARD
972            .decode(&response)
973            .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
974
975        let response_packet = ap_noise::HandshakePacket::decode(&response_bytes)
976            .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
977
978        // Complete handshake
979        handshake.receive_finish(&response_packet)?;
980        let (transport, fingerprint) = handshake.finalize()?;
981
982        debug!("Handshake complete");
983        Ok((transport, fingerprint.to_string()))
984    }
985}
986
987fn uuid_v4() -> String {
988    // Simple UUID v4 generation without external dependency
989    let mut bytes = [0u8; 16];
990    let mut rng = rand::thread_rng();
991    rng.fill_bytes(&mut bytes);
992
993    // Set version (4) and variant bits
994    bytes[6] = (bytes[6] & 0x0f) | 0x40;
995    bytes[8] = (bytes[8] & 0x3f) | 0x80;
996
997    format!(
998        "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
999        bytes[0],
1000        bytes[1],
1001        bytes[2],
1002        bytes[3],
1003        bytes[4],
1004        bytes[5],
1005        bytes[6],
1006        bytes[7],
1007        bytes[8],
1008        bytes[9],
1009        bytes[10],
1010        bytes[11],
1011        bytes[12],
1012        bytes[13],
1013        bytes[14],
1014        bytes[15]
1015    )
1016}