Skip to main content

ap_client/clients/
user_client.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(target_arch = "wasm32")]
9use web_time::Instant;
10
11use ap_noise::{Ciphersuite, MultiDeviceTransport, Psk, ResponderHandshake};
12use ap_proxy_client::IncomingMessage;
13use ap_proxy_protocol::{IdentityFingerprint, RendezvousCode};
14use base64::{Engine, engine::general_purpose::STANDARD};
15use futures_util::StreamExt;
16use futures_util::stream::FuturesUnordered;
17use tokio::sync::oneshot;
18
19use crate::proxy::ProxyClient;
20use crate::types::{CredentialData, PskId};
21use tokio::sync::mpsc;
22use tracing::{debug, warn};
23
24/// Base delay for reconnection backoff.
25const RECONNECT_BASE_DELAY: Duration = Duration::from_secs(2);
26/// Maximum delay between reconnection attempts (15 minutes).
27const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(15 * 60);
28/// Maximum age for pending pairings before they are pruned.
29const PENDING_PAIRING_MAX_AGE: Duration = Duration::from_secs(10 * 60);
30
31/// The kind of pairing: rendezvous (null PSK) or PSK (real key).
32pub(crate) enum PairingKind {
33    /// Rendezvous pairing — uses a null PSK, requires fingerprint verification.
34    ///
35    /// The `reply` sender is `Some(...)` while waiting for the proxy's `RendezvousInfo`
36    /// response, and becomes `None` once the rendezvous code has been delivered.
37    Rendezvous {
38        reply: Option<oneshot::Sender<Result<RendezvousCode, RemoteClientError>>>,
39    },
40    /// PSK pairing — uses a real pre-shared key, no fingerprint verification needed.
41    Psk { psk: Psk, psk_id: PskId },
42}
43
44/// A pending pairing waiting for an incoming handshake.
45pub(crate) struct PendingPairing {
46    /// Friendly name to assign to the session once paired.
47    connection_name: Option<String>,
48    /// When this pairing was created (for pruning stale entries).
49    created_at: Instant,
50    /// The kind of pairing.
51    kind: PairingKind,
52}
53
54use crate::{
55    error::RemoteClientError,
56    traits::{
57        AuditConnectionType, AuditEvent, AuditLog, CredentialFieldSet, IdentityProvider,
58        NoOpAuditLog, SessionStore,
59    },
60    types::{CredentialRequestPayload, CredentialResponsePayload, ProtocolMessage},
61};
62
63// =============================================================================
64// Public types: Notifications (fire-and-forget) and Requests (with reply)
65// =============================================================================
66
67/// Fire-and-forget status updates emitted by the user client.
68#[derive(Debug, Clone)]
69pub enum UserClientNotification {
70    /// Started listening for connections
71    Listening {},
72    /// Noise handshake started
73    HandshakeStart {},
74    /// Noise handshake progress
75    HandshakeProgress {
76        /// Progress message
77        message: String,
78    },
79    /// Noise handshake complete
80    HandshakeComplete {},
81    /// Handshake fingerprint (informational, for PSK connections)
82    HandshakeFingerprint {
83        /// The 6-character hex fingerprint for visual verification
84        fingerprint: String,
85        /// The remote device's stable identity fingerprint
86        identity: IdentityFingerprint,
87    },
88    /// Fingerprint was verified and connection accepted
89    FingerprintVerified {},
90    /// Fingerprint was rejected and connection discarded
91    FingerprintRejected {
92        /// Reason for rejection
93        reason: String,
94    },
95    /// Credential was approved and sent
96    CredentialApproved {
97        /// Domain from the matched credential
98        domain: Option<String>,
99        /// Vault item ID
100        credential_id: Option<String>,
101    },
102    /// Credential was denied
103    CredentialDenied {
104        /// Domain from the matched credential
105        domain: Option<String>,
106        /// Vault item ID
107        credential_id: Option<String>,
108    },
109    /// A known/cached device reconnected — transport keys refreshed, no re-verification needed
110    SessionRefreshed {
111        /// The remote device's identity fingerprint
112        fingerprint: IdentityFingerprint,
113    },
114    /// Client disconnected from proxy
115    ClientDisconnected {},
116    /// Attempting to reconnect to proxy
117    Reconnecting {
118        /// Current reconnection attempt number
119        attempt: u32,
120    },
121    /// Successfully reconnected to proxy
122    Reconnected {},
123    /// An error occurred
124    Error {
125        /// Error message
126        message: String,
127        /// Context where error occurred
128        context: Option<String>,
129    },
130}
131
132/// Reply for fingerprint verification requests.
133#[derive(Debug)]
134pub struct FingerprintVerificationReply {
135    /// Whether user approved the fingerprint
136    pub approved: bool,
137    /// Optional friendly name to assign to the session
138    pub name: Option<String>,
139}
140
141/// Reply for credential requests.
142#[derive(Debug)]
143pub struct CredentialRequestReply {
144    /// Whether approved
145    pub approved: bool,
146    /// The credential to send (if approved)
147    pub credential: Option<CredentialData>,
148    /// Vault item ID (for audit logging)
149    pub credential_id: Option<String>,
150}
151
152/// Requests that require a caller response, carrying a oneshot reply channel.
153#[derive(Debug)]
154pub enum UserClientRequest {
155    /// Handshake fingerprint requires verification (rendezvous connections only).
156    VerifyFingerprint {
157        /// The 6-character hex fingerprint for visual verification
158        fingerprint: String,
159        /// The remote device's stable identity fingerprint
160        identity: IdentityFingerprint,
161        /// Channel to send the verification reply
162        reply: oneshot::Sender<FingerprintVerificationReply>,
163    },
164    /// Credential request received — caller must approve/deny and provide the credential.
165    CredentialRequest {
166        /// The credential query
167        query: crate::types::CredentialQuery,
168        /// The requesting device's identity fingerprint
169        identity: IdentityFingerprint,
170        /// Channel to send the credential reply
171        reply: oneshot::Sender<CredentialRequestReply>,
172    },
173}
174
175// =============================================================================
176// Internal types for pending reply tracking
177// =============================================================================
178
179/// Resolved reply from a pending oneshot, carrying the context needed to process it.
180enum PendingReply {
181    FingerprintVerification {
182        source: IdentityFingerprint,
183        transport: MultiDeviceTransport,
184        connection_name: Option<String>,
185        reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
186    },
187    CredentialResponse {
188        source: IdentityFingerprint,
189        request_id: String,
190        query: crate::types::CredentialQuery,
191        reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
192    },
193}
194
195/// A boxed future that resolves to a `PendingReply`.
196type PendingReplyFuture = Pin<Box<dyn Future<Output = PendingReply> + Send>>;
197
198// =============================================================================
199// Command channel for UserClient handle → event loop communication
200// =============================================================================
201
202/// Commands sent from a `UserClient` handle to the running event loop.
203enum UserClientCommand {
204    /// Generate a PSK token and register a pending pairing.
205    GetPskToken {
206        name: Option<String>,
207        reply: oneshot::Sender<Result<String, RemoteClientError>>,
208    },
209    /// Request a rendezvous code from the proxy and register a pending pairing.
210    GetRendezvousToken {
211        name: Option<String>,
212        reply: oneshot::Sender<Result<RendezvousCode, RemoteClientError>>,
213    },
214}
215
216/// A cloneable handle for controlling the user client.
217///
218/// Obtained from [`UserClient::connect()`], which authenticates with the proxy,
219/// spawns the event loop internally, and returns this handle. All methods
220/// communicate with the event loop through an internal command channel.
221///
222/// `Clone` and `Send` — share freely across tasks and threads.
223#[derive(Clone)]
224pub struct UserClient {
225    command_tx: mpsc::Sender<UserClientCommand>,
226}
227
228impl UserClient {
229    /// Connect to the proxy server, spawn the event loop, and return a handle.
230    ///
231    /// This is the single entry point. After `connect()` returns, the client is
232    /// already listening for incoming connections. Use `get_psk_token()` or
233    /// `get_rendezvous_token()` to set up pairings, and receive events through
234    /// the provided notification/request channels.
235    ///
236    /// Pass `None` for `audit_log` to use a no-op logger.
237    pub async fn connect(
238        identity_provider: Box<dyn IdentityProvider>,
239        session_store: Box<dyn SessionStore>,
240        mut proxy_client: Box<dyn ProxyClient>,
241        notification_tx: mpsc::Sender<UserClientNotification>,
242        request_tx: mpsc::Sender<UserClientRequest>,
243        audit_log: Option<Box<dyn AuditLog>>,
244    ) -> Result<Self, RemoteClientError> {
245        // Authenticate with the proxy (the async part — before spawn)
246        let incoming_rx = proxy_client.connect().await?;
247
248        // Create command channel
249        let (command_tx, command_rx) = mpsc::channel(32);
250
251        // Cache fingerprint before spawning (avoids repeated async calls)
252        let own_fingerprint = identity_provider.fingerprint().await;
253
254        // Build the inner state
255        let inner = UserClientInner {
256            session_store,
257            proxy_client,
258            own_fingerprint,
259            transports: HashMap::new(),
260            pending_pairings: Vec::new(),
261            audit_log: audit_log.unwrap_or_else(|| Box::new(NoOpAuditLog)),
262        };
263
264        // Spawn the event loop — use spawn_local on WASM (no Tokio runtime)
265        #[cfg(target_arch = "wasm32")]
266        wasm_bindgen_futures::spawn_local(inner.run_event_loop(
267            incoming_rx,
268            command_rx,
269            notification_tx,
270            request_tx,
271        ));
272        #[cfg(not(target_arch = "wasm32"))]
273        tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
274
275        Ok(Self { command_tx })
276    }
277
278    /// Generate a PSK token for a new pairing.
279    ///
280    /// Returns the formatted token string (`<psk_hex>_<fingerprint_hex>`).
281    /// Multiple PSK pairings are supported concurrently (each matched by `psk_id`).
282    pub async fn get_psk_token(&self, name: Option<String>) -> Result<String, RemoteClientError> {
283        let (tx, rx) = oneshot::channel();
284        self.command_tx
285            .send(UserClientCommand::GetPskToken { name, reply: tx })
286            .await
287            .map_err(|_| RemoteClientError::ChannelClosed)?;
288        rx.await.map_err(|_| RemoteClientError::ChannelClosed)?
289    }
290
291    /// Request a rendezvous code from the proxy for a new pairing.
292    ///
293    /// Only one rendezvous pairing at a time — there's no way to distinguish
294    /// incoming rendezvous handshakes.
295    pub async fn get_rendezvous_token(
296        &self,
297        name: Option<String>,
298    ) -> Result<RendezvousCode, RemoteClientError> {
299        let (tx, rx) = oneshot::channel();
300        self.command_tx
301            .send(UserClientCommand::GetRendezvousToken { name, reply: tx })
302            .await
303            .map_err(|_| RemoteClientError::ChannelClosed)?;
304        rx.await.map_err(|_| RemoteClientError::ChannelClosed)?
305    }
306}
307
308// =============================================================================
309// Internal state — lives inside the spawned event loop task
310// =============================================================================
311
312/// All mutable state for the user client, owned by the spawned event loop task.
313struct UserClientInner {
314    session_store: Box<dyn SessionStore>,
315    proxy_client: Box<dyn ProxyClient>,
316    /// Our own identity fingerprint (cached at construction time).
317    own_fingerprint: IdentityFingerprint,
318    /// Map of fingerprint -> transport
319    transports: HashMap<IdentityFingerprint, MultiDeviceTransport>,
320    /// Pending pairings awaiting incoming handshakes.
321    pending_pairings: Vec<PendingPairing>,
322    /// Audit logger for security-relevant events
323    audit_log: Box<dyn AuditLog>,
324}
325
326impl UserClientInner {
327    /// Run the main event loop (consumes self).
328    async fn run_event_loop(
329        mut self,
330        mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
331        mut command_rx: mpsc::Receiver<UserClientCommand>,
332        notification_tx: mpsc::Sender<UserClientNotification>,
333        request_tx: mpsc::Sender<UserClientRequest>,
334    ) {
335        // Emit Listening notification
336        notification_tx
337            .send(UserClientNotification::Listening {})
338            .await
339            .ok();
340
341        let mut pending_replies: FuturesUnordered<PendingReplyFuture> = FuturesUnordered::new();
342
343        loop {
344            tokio::select! {
345                msg = incoming_rx.recv() => {
346                    match msg {
347                        Some(msg) => {
348                            match self.handle_incoming(msg, &notification_tx, &request_tx).await {
349                                Ok(Some(fut)) => pending_replies.push(fut),
350                                Ok(None) => {}
351                                Err(e) => {
352                                    warn!("Error handling incoming message: {}", e);
353                                    notification_tx.send(UserClientNotification::Error {
354                                        message: e.to_string(),
355                                        context: Some("handle_incoming".to_string()),
356                                    }).await.ok();
357                                }
358                            }
359                        }
360                        None => {
361                            // Incoming channel closed — proxy connection lost
362                            notification_tx.send(UserClientNotification::ClientDisconnected {}).await.ok();
363                            match self.attempt_reconnection(&notification_tx).await {
364                                Ok(new_rx) => {
365                                    incoming_rx = new_rx;
366                                    notification_tx.send(UserClientNotification::Reconnected {}).await.ok();
367                                }
368                                Err(e) => {
369                                    warn!("Reconnection failed permanently: {}", e);
370                                    notification_tx.send(UserClientNotification::Error {
371                                        message: e.to_string(),
372                                        context: Some("reconnection".to_string()),
373                                    }).await.ok();
374                                    return;
375                                }
376                            }
377                        }
378                    }
379                }
380                Some(reply) = pending_replies.next() => {
381                    if let Err(e) = self.process_pending_reply(reply, &notification_tx).await {
382                        warn!("Error processing pending reply: {}", e);
383                        notification_tx.send(UserClientNotification::Error {
384                            message: e.to_string(),
385                            context: Some("process_pending_reply".to_string()),
386                        }).await.ok();
387                    }
388                }
389                cmd = command_rx.recv() => {
390                    match cmd {
391                        Some(cmd) => self.handle_command(cmd, &notification_tx).await,
392                        None => {
393                            // All handles dropped — shut down
394                            debug!("All UserClient handles dropped, shutting down event loop");
395                            return;
396                        }
397                    }
398                }
399            }
400        }
401    }
402
403    /// Attempt to reconnect to the proxy server with exponential backoff.
404    async fn attempt_reconnection(
405        &mut self,
406        notification_tx: &mpsc::Sender<UserClientNotification>,
407    ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, RemoteClientError> {
408        use rand::{Rng, SeedableRng};
409
410        let mut rng = rand::rngs::StdRng::from_entropy();
411        let mut attempt: u32 = 0;
412
413        loop {
414            attempt = attempt.saturating_add(1);
415
416            // Disconnect (ignore errors — connection may already be dead)
417            let _ = self.proxy_client.disconnect().await;
418
419            match self.proxy_client.connect().await {
420                Ok(new_rx) => {
421                    debug!("Reconnected to proxy on attempt {}", attempt);
422                    return Ok(new_rx);
423                }
424                Err(e) => {
425                    debug!("Reconnection attempt {} failed: {}", attempt, e);
426                    notification_tx
427                        .send(UserClientNotification::Reconnecting { attempt })
428                        .await
429                        .ok();
430
431                    // Exponential backoff with jitter
432                    let exp_delay = RECONNECT_BASE_DELAY
433                        .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
434                    let delay = exp_delay.min(RECONNECT_MAX_DELAY);
435                    let jitter_max = (delay.as_millis() as u64) / 4;
436                    let jitter = if jitter_max > 0 {
437                        rng.gen_range(0..=jitter_max)
438                    } else {
439                        0
440                    };
441                    let total_delay = delay + Duration::from_millis(jitter);
442
443                    crate::compat::sleep(total_delay).await;
444                }
445            }
446        }
447    }
448
449    /// Handle incoming messages from proxy.
450    async fn handle_incoming(
451        &mut self,
452        msg: IncomingMessage,
453        notification_tx: &mpsc::Sender<UserClientNotification>,
454        request_tx: &mpsc::Sender<UserClientRequest>,
455    ) -> Result<Option<PendingReplyFuture>, RemoteClientError> {
456        match msg {
457            IncomingMessage::Send {
458                source, payload, ..
459            } => {
460                // Parse payload as ProtocolMessage
461                let text = String::from_utf8(payload)
462                    .map_err(|e| RemoteClientError::Serialization(format!("Invalid UTF-8: {e}")))?;
463
464                let protocol_msg: ProtocolMessage = serde_json::from_str(&text)?;
465
466                match protocol_msg {
467                    ProtocolMessage::HandshakeInit {
468                        data,
469                        ciphersuite,
470                        psk_id,
471                    } => {
472                        self.handle_handshake_init(
473                            source,
474                            data,
475                            ciphersuite,
476                            psk_id,
477                            notification_tx,
478                            request_tx,
479                        )
480                        .await
481                    }
482                    ProtocolMessage::CredentialRequest { encrypted } => {
483                        self.handle_credential_request(
484                            source,
485                            encrypted,
486                            notification_tx,
487                            request_tx,
488                        )
489                        .await
490                    }
491                    _ => {
492                        debug!("Received unexpected message type from {:?}", source);
493                        Ok(None)
494                    }
495                }
496            }
497            IncomingMessage::RendezvousInfo(code) => {
498                // Find the pending rendezvous pairing that is still awaiting a reply
499                let idx = self
500                    .pending_pairings
501                    .iter()
502                    .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }));
503
504                if let Some(idx) = idx {
505                    // Take the reply sender out, leaving the pairing in place with reply: None
506                    let pairing = &mut self.pending_pairings[idx];
507                    if let PairingKind::Rendezvous { reply } = &mut pairing.kind {
508                        if let Some(sender) = reply.take() {
509                            debug!("Completed rendezvous pairing via handle, code: {}", code);
510                            let _ = sender.send(Ok(code));
511                        }
512                    }
513                } else {
514                    debug!("Received RendezvousInfo but no pending rendezvous pairing found");
515                }
516                Ok(None)
517            }
518            IncomingMessage::IdentityInfo { .. } => {
519                // Only RemoteClient needs this
520                debug!("Received unexpected IdentityInfo message");
521                Ok(None)
522            }
523        }
524    }
525
526    /// Handle handshake init message.
527    async fn handle_handshake_init(
528        &mut self,
529        source: IdentityFingerprint,
530        data: String,
531        ciphersuite: String,
532        psk_id: Option<PskId>,
533        notification_tx: &mpsc::Sender<UserClientNotification>,
534        request_tx: &mpsc::Sender<UserClientRequest>,
535    ) -> Result<Option<PendingReplyFuture>, RemoteClientError> {
536        debug!("Received handshake init from source: {:?}", source);
537        notification_tx
538            .send(UserClientNotification::HandshakeStart {})
539            .await
540            .ok();
541
542        // Check if this is an existing/cached session (bypass pairing lookup)
543        let is_new_connection = !self.session_store.has_session(&source).await;
544
545        // Determine which PSK to use and find the matching pairing.
546        let (psk_for_handshake, matched_pairing_name, is_psk_connection) = if !is_new_connection {
547            // Existing/cached session — no pairing lookup needed
548            (None, None, false)
549        } else {
550            // New connection — look up and consume a pending pairing
551            Self::prune_stale_pairings(&mut self.pending_pairings);
552
553            match &psk_id {
554                Some(id) => {
555                    // PSK mode — find matching pairing by psk_id
556                    let idx = self.pending_pairings.iter().position(
557                        |p| matches!(&p.kind, PairingKind::Psk { psk_id: pid, .. } if pid == id),
558                    );
559                    if let Some(idx) = idx {
560                        let pairing = self.pending_pairings.remove(idx);
561                        let psk = match pairing.kind {
562                            PairingKind::Psk { psk, .. } => psk,
563                            PairingKind::Rendezvous { .. } => unreachable!(),
564                        };
565                        (Some(psk), pairing.connection_name, true)
566                    } else {
567                        warn!("No matching PSK pairing for psk_id: {}", id);
568                        return Err(RemoteClientError::InvalidState {
569                            expected: "matching PSK pairing".to_string(),
570                            current: format!("no pairing for psk_id {id}"),
571                        });
572                    }
573                }
574                None => {
575                    // Rendezvous mode — find a confirmed rendezvous pairing
576                    // (one whose reply has already been sent, i.e. reply is None)
577                    let idx = self
578                        .pending_pairings
579                        .iter()
580                        .position(|p| matches!(p.kind, PairingKind::Rendezvous { reply: None }));
581                    let connection_name =
582                        idx.and_then(|i| self.pending_pairings.remove(i).connection_name);
583                    (None, connection_name, false)
584                }
585            }
586        };
587
588        let (transport, fingerprint_str) = self
589            .complete_handshake(source, &data, &ciphersuite, psk_for_handshake.as_ref())
590            .await?;
591
592        notification_tx
593            .send(UserClientNotification::HandshakeComplete {})
594            .await
595            .ok();
596
597        if is_new_connection && !is_psk_connection {
598            // New rendezvous connection: require fingerprint verification.
599            let (tx, rx) = oneshot::channel();
600
601            request_tx
602                .send(UserClientRequest::VerifyFingerprint {
603                    fingerprint: fingerprint_str,
604                    identity: source,
605                    reply: tx,
606                })
607                .await
608                .ok();
609
610            let fut: PendingReplyFuture = Box::pin(async move {
611                let result = rx.await;
612                PendingReply::FingerprintVerification {
613                    source,
614                    transport,
615                    connection_name: matched_pairing_name,
616                    reply: result,
617                }
618            });
619
620            Ok(Some(fut))
621        } else if !is_new_connection {
622            // Existing/cached session: already verified on first connection.
623            self.transports.insert(source, transport.clone());
624            self.session_store.cache_session(source).await?;
625            self.session_store
626                .save_transport_state(&source, transport)
627                .await?;
628
629            self.audit_log
630                .write(AuditEvent::SessionRefreshed {
631                    remote_identity: &source,
632                })
633                .await;
634
635            notification_tx
636                .send(UserClientNotification::SessionRefreshed {
637                    fingerprint: source,
638                })
639                .await
640                .ok();
641
642            Ok(None)
643        } else {
644            // PSK connection: trust established via pre-shared key, no verification needed
645            self.accept_new_connection(
646                source,
647                transport,
648                matched_pairing_name.as_deref(),
649                AuditConnectionType::Psk,
650            )
651            .await?;
652
653            // Emit fingerprint as informational notification (no reply needed)
654            notification_tx
655                .send(UserClientNotification::HandshakeFingerprint {
656                    fingerprint: fingerprint_str,
657                    identity: source,
658                })
659                .await
660                .ok();
661
662            Ok(None)
663        }
664    }
665
666    /// Remove pending pairings older than `PENDING_PAIRING_MAX_AGE`.
667    fn prune_stale_pairings(pairings: &mut Vec<PendingPairing>) {
668        pairings.retain(|p| p.created_at.elapsed() < PENDING_PAIRING_MAX_AGE);
669    }
670
671    /// Accept a new connection: cache session, store transport, set name, and audit
672    async fn accept_new_connection(
673        &mut self,
674        fingerprint: IdentityFingerprint,
675        transport: MultiDeviceTransport,
676        session_name: Option<&str>,
677        connection_type: AuditConnectionType,
678    ) -> Result<(), RemoteClientError> {
679        self.transports.insert(fingerprint, transport.clone());
680        self.session_store.cache_session(fingerprint).await?;
681        if let Some(name) = session_name {
682            self.session_store
683                .set_session_name(&fingerprint, name.to_owned())
684                .await?;
685        }
686        self.session_store
687            .save_transport_state(&fingerprint, transport)
688            .await?;
689
690        self.audit_log
691            .write(AuditEvent::ConnectionEstablished {
692                remote_identity: &fingerprint,
693                remote_name: session_name,
694                connection_type,
695            })
696            .await;
697
698        Ok(())
699    }
700
701    /// Handle credential request.
702    async fn handle_credential_request(
703        &mut self,
704        source: IdentityFingerprint,
705        encrypted: String,
706        notification_tx: &mpsc::Sender<UserClientNotification>,
707        request_tx: &mpsc::Sender<UserClientRequest>,
708    ) -> Result<Option<PendingReplyFuture>, RemoteClientError> {
709        if !self.transports.contains_key(&source) {
710            debug!("Loading transport state for source: {:?}", source);
711            let session = self
712                .session_store
713                .load_transport_state(&source)
714                .await?
715                .ok_or_else(|| {
716                    RemoteClientError::SessionCache(format!(
717                        "Missing transport state for cached session {source:?}"
718                    ))
719                })?;
720            self.transports.insert(source, session);
721        }
722
723        // Get transport for this source
724        let transport = self
725            .transports
726            .get_mut(&source)
727            .ok_or(RemoteClientError::SecureChannelNotEstablished)?;
728
729        // Decrypt request
730        let encrypted_bytes = STANDARD
731            .decode(&encrypted)
732            .map_err(|e| RemoteClientError::Serialization(format!("Invalid base64: {e}")))?;
733
734        let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
735            .map_err(|e| RemoteClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
736
737        let decrypted = transport
738            .decrypt(&packet)
739            .map_err(|e| RemoteClientError::NoiseProtocol(e.to_string()))?;
740
741        let request: CredentialRequestPayload = serde_json::from_slice(&decrypted)?;
742
743        self.audit_log
744            .write(AuditEvent::CredentialRequested {
745                query: &request.query,
746                remote_identity: &source,
747                request_id: &request.request_id,
748            })
749            .await;
750
751        // Create oneshot channel for the reply
752        let (tx, rx) = oneshot::channel();
753
754        // Send request to caller
755        if request_tx
756            .send(UserClientRequest::CredentialRequest {
757                query: request.query.clone(),
758                identity: source,
759                reply: tx,
760            })
761            .await
762            .is_err()
763        {
764            // Request channel closed — caller is gone
765            warn!("Request channel closed, cannot send credential request");
766            notification_tx
767                .send(UserClientNotification::Error {
768                    message: "Request channel closed".to_string(),
769                    context: Some("handle_credential_request".to_string()),
770                })
771                .await
772                .ok();
773            return Ok(None);
774        }
775
776        // Return future that awaits the reply
777        let request_id = request.request_id;
778        let query = request.query;
779        let fut: PendingReplyFuture = Box::pin(async move {
780            let result = rx.await;
781            PendingReply::CredentialResponse {
782                source,
783                request_id,
784                query,
785                reply: result,
786            }
787        });
788
789        Ok(Some(fut))
790    }
791
792    /// Process a resolved pending reply from the `FuturesUnordered`.
793    async fn process_pending_reply(
794        &mut self,
795        reply: PendingReply,
796        notification_tx: &mpsc::Sender<UserClientNotification>,
797    ) -> Result<(), RemoteClientError> {
798        match reply {
799            PendingReply::FingerprintVerification {
800                source,
801                transport,
802                connection_name,
803                reply,
804            } => {
805                self.process_fingerprint_reply(
806                    source,
807                    transport,
808                    connection_name,
809                    reply,
810                    notification_tx,
811                )
812                .await
813            }
814            PendingReply::CredentialResponse {
815                source,
816                request_id,
817                query,
818                reply,
819            } => {
820                self.process_credential_reply(source, request_id, query, reply, notification_tx)
821                    .await
822            }
823        }
824    }
825
826    /// Handle a command from a `UserClient` handle.
827    async fn handle_command(
828        &mut self,
829        cmd: UserClientCommand,
830        notification_tx: &mpsc::Sender<UserClientNotification>,
831    ) {
832        match cmd {
833            UserClientCommand::GetPskToken { name, reply } => {
834                let result = self.generate_psk_token(name).await;
835                let _ = reply.send(result);
836            }
837            UserClientCommand::GetRendezvousToken { name, reply } => {
838                if let Err(e) = self.proxy_client.request_rendezvous().await {
839                    let _ = reply.send(Err(e));
840                    return;
841                }
842
843                // Prune stale pairings
844                Self::prune_stale_pairings(&mut self.pending_pairings);
845
846                // If there's already a pending rendezvous pairing awaiting its code,
847                // error the old one rather than silently overwriting it
848                if let Some(old_idx) = self
849                    .pending_pairings
850                    .iter()
851                    .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }))
852                {
853                    let old = self.pending_pairings.remove(old_idx);
854                    if let PairingKind::Rendezvous {
855                        reply: Some(old_reply),
856                    } = old.kind
857                    {
858                        warn!("Replacing existing pending rendezvous pairing");
859                        let _ = old_reply.send(Err(RemoteClientError::InvalidState {
860                            expected: "single pending rendezvous".to_string(),
861                            current: "replaced by new rendezvous request".to_string(),
862                        }));
863                    }
864                }
865
866                // Push the new pairing immediately — reply will be completed
867                // when RendezvousInfo arrives from the proxy
868                self.pending_pairings.push(PendingPairing {
869                    connection_name: name,
870                    created_at: Instant::now(),
871                    kind: PairingKind::Rendezvous { reply: Some(reply) },
872                });
873
874                // Emit notification so the caller knows a code is being requested
875                notification_tx
876                    .send(UserClientNotification::HandshakeProgress {
877                        message: "Requesting rendezvous code from proxy...".to_string(),
878                    })
879                    .await
880                    .ok();
881            }
882        }
883    }
884
885    /// Generate a PSK token internally.
886    async fn generate_psk_token(
887        &mut self,
888        name: Option<String>,
889    ) -> Result<String, RemoteClientError> {
890        let psk = Psk::generate();
891        let psk_id = psk.id();
892        let token = format!("{}_{}", psk.to_hex(), hex::encode(self.own_fingerprint.0));
893
894        let pairing = PendingPairing {
895            connection_name: name,
896            created_at: Instant::now(),
897            kind: PairingKind::Psk { psk, psk_id },
898        };
899
900        Self::prune_stale_pairings(&mut self.pending_pairings);
901        self.pending_pairings.push(pairing);
902        debug!("Created PSK pairing, token generated");
903
904        Ok(token)
905    }
906
907    /// Process a fingerprint verification reply.
908    async fn process_fingerprint_reply(
909        &mut self,
910        source: IdentityFingerprint,
911        transport: MultiDeviceTransport,
912        connection_name: Option<String>,
913        reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
914        notification_tx: &mpsc::Sender<UserClientNotification>,
915    ) -> Result<(), RemoteClientError> {
916        match reply {
917            Ok(FingerprintVerificationReply {
918                approved: true,
919                name,
920            }) => {
921                // Use the name from the reply, falling back to the pairing name
922                let session_name = name.or(connection_name);
923                self.accept_new_connection(
924                    source,
925                    transport,
926                    session_name.as_deref(),
927                    AuditConnectionType::Rendezvous,
928                )
929                .await?;
930
931                notification_tx
932                    .send(UserClientNotification::FingerprintVerified {})
933                    .await
934                    .ok();
935            }
936            Ok(FingerprintVerificationReply {
937                approved: false, ..
938            }) => {
939                self.audit_log
940                    .write(AuditEvent::ConnectionRejected {
941                        remote_identity: &source,
942                    })
943                    .await;
944
945                notification_tx
946                    .send(UserClientNotification::FingerprintRejected {
947                        reason: "User rejected fingerprint verification".to_string(),
948                    })
949                    .await
950                    .ok();
951            }
952            Err(_) => {
953                // Oneshot sender was dropped without replying — treat as rejection
954                warn!("Fingerprint verification reply channel dropped, treating as rejection");
955                self.audit_log
956                    .write(AuditEvent::ConnectionRejected {
957                        remote_identity: &source,
958                    })
959                    .await;
960
961                notification_tx
962                    .send(UserClientNotification::FingerprintRejected {
963                        reason: "Verification cancelled (reply dropped)".to_string(),
964                    })
965                    .await
966                    .ok();
967            }
968        }
969
970        Ok(())
971    }
972
973    /// Process a credential request reply.
974    #[allow(clippy::too_many_arguments)]
975    async fn process_credential_reply(
976        &mut self,
977        source: IdentityFingerprint,
978        request_id: String,
979        query: crate::types::CredentialQuery,
980        reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
981        notification_tx: &mpsc::Sender<UserClientNotification>,
982    ) -> Result<(), RemoteClientError> {
983        let reply = match reply {
984            Ok(r) => r,
985            Err(_) => {
986                // Oneshot sender was dropped — treat as denial
987                warn!("Credential reply channel dropped, treating as denial");
988                CredentialRequestReply {
989                    approved: false,
990                    credential: None,
991                    credential_id: None,
992                }
993            }
994        };
995
996        let transport = self
997            .transports
998            .get_mut(&source)
999            .ok_or(RemoteClientError::SecureChannelNotEstablished)?;
1000
1001        // Extract domain and audit fields before credential is moved into the response payload
1002        let domain = reply.credential.as_ref().and_then(|c| c.domain.clone());
1003        let fields = reply
1004            .credential
1005            .as_ref()
1006            .map_or_else(CredentialFieldSet::default, |c| CredentialFieldSet {
1007                has_username: c.username.is_some(),
1008                has_password: c.password.is_some(),
1009                has_totp: c.totp.is_some(),
1010                has_uri: c.uri.is_some(),
1011                has_notes: c.notes.is_some(),
1012            });
1013
1014        // Create response payload
1015        let response_payload = CredentialResponsePayload {
1016            credential: if reply.approved {
1017                reply.credential
1018            } else {
1019                None
1020            },
1021            error: if !reply.approved {
1022                Some("Request denied".to_string())
1023            } else {
1024                None
1025            },
1026            request_id: Some(request_id.clone()),
1027        };
1028
1029        // Encrypt and send
1030        let response_json = serde_json::to_string(&response_payload)?;
1031        let encrypted = transport
1032            .encrypt(response_json.as_bytes())
1033            .map_err(|e| RemoteClientError::NoiseProtocol(e.to_string()))?;
1034
1035        let msg = ProtocolMessage::CredentialResponse {
1036            encrypted: STANDARD.encode(encrypted.encode()),
1037        };
1038
1039        let msg_json = serde_json::to_string(&msg)?;
1040
1041        self.proxy_client
1042            .send_to(source, msg_json.into_bytes())
1043            .await?;
1044
1045        // Send notification and audit
1046        if reply.approved {
1047            self.audit_log
1048                .write(AuditEvent::CredentialApproved {
1049                    query: &query,
1050                    domain: domain.as_deref(),
1051                    remote_identity: &source,
1052                    request_id: &request_id,
1053                    credential_id: reply.credential_id.as_deref(),
1054                    fields,
1055                })
1056                .await;
1057
1058            notification_tx
1059                .send(UserClientNotification::CredentialApproved {
1060                    domain,
1061                    credential_id: reply.credential_id,
1062                })
1063                .await
1064                .ok();
1065        } else {
1066            self.audit_log
1067                .write(AuditEvent::CredentialDenied {
1068                    query: &query,
1069                    domain: domain.as_deref(),
1070                    remote_identity: &source,
1071                    request_id: &request_id,
1072                    credential_id: reply.credential_id.as_deref(),
1073                })
1074                .await;
1075
1076            notification_tx
1077                .send(UserClientNotification::CredentialDenied {
1078                    domain,
1079                    credential_id: reply.credential_id,
1080                })
1081                .await
1082                .ok();
1083        }
1084
1085        Ok(())
1086    }
1087
1088    /// Complete Noise handshake as responder
1089    async fn complete_handshake(
1090        &self,
1091        remote_fingerprint: IdentityFingerprint,
1092        handshake_data: &str,
1093        ciphersuite_str: &str,
1094        psk: Option<&Psk>,
1095    ) -> Result<(MultiDeviceTransport, String), RemoteClientError> {
1096        // Parse ciphersuite
1097        let ciphersuite = match ciphersuite_str {
1098            s if s.contains("Kyber768") => Ciphersuite::PQNNpsk2_Kyber768_XChaCha20Poly1305,
1099            _ => Ciphersuite::ClassicalNNpsk2_25519_XChaCha20Poly1035,
1100        };
1101
1102        // Decode handshake data
1103        let init_bytes = STANDARD
1104            .decode(handshake_data)
1105            .map_err(|e| RemoteClientError::Serialization(format!("Invalid base64: {e}")))?;
1106
1107        let init_packet = ap_noise::HandshakePacket::decode(&init_bytes)
1108            .map_err(|e| RemoteClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
1109
1110        // Create responder handshake — with PSK if provided, otherwise null PSK (rendezvous)
1111        let mut handshake = if let Some(psk) = psk {
1112            ResponderHandshake::with_psk(psk.clone())
1113        } else {
1114            ResponderHandshake::new()
1115        };
1116
1117        // Process init and generate response
1118        handshake.receive_start(&init_packet)?;
1119        let response_packet = handshake.send_finish()?;
1120        let (transport, fingerprint) = handshake.finalize()?;
1121
1122        // Send response
1123        let msg = ProtocolMessage::HandshakeResponse {
1124            data: STANDARD.encode(response_packet.encode()?),
1125            ciphersuite: format!("{ciphersuite:?}"),
1126        };
1127
1128        let msg_json = serde_json::to_string(&msg)?;
1129
1130        self.proxy_client
1131            .send_to(remote_fingerprint, msg_json.into_bytes())
1132            .await?;
1133
1134        debug!("Sent handshake response to {:?}", remote_fingerprint);
1135
1136        Ok((transport, fingerprint.to_string()))
1137    }
1138}