Skip to main content

ap_client/clients/
user_client.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(target_arch = "wasm32")]
9use web_time::Instant;
10
11use ap_noise::{Ciphersuite, MultiDeviceTransport, Psk, ResponderHandshake};
12use ap_proxy_client::IncomingMessage;
13use ap_proxy_protocol::{IdentityFingerprint, RendezvousCode};
14use base64::{Engine, engine::general_purpose::STANDARD};
15use futures_util::StreamExt;
16use futures_util::stream::FuturesUnordered;
17use tokio::sync::oneshot;
18
19use crate::proxy::ProxyClient;
20use crate::types::{CredentialData, PskId};
21use tokio::sync::mpsc;
22use tracing::{debug, warn};
23
24/// Base delay for reconnection backoff.
25const RECONNECT_BASE_DELAY: Duration = Duration::from_secs(2);
26/// Maximum delay between reconnection attempts (15 minutes).
27const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(15 * 60);
28/// Maximum age for pending pairings before they are pruned.
29const PENDING_PAIRING_MAX_AGE: Duration = Duration::from_secs(10 * 60);
30
31/// The kind of pairing: rendezvous (null PSK) or PSK (real key).
32pub(crate) enum PairingKind {
33    /// Rendezvous pairing — uses a null PSK, requires fingerprint verification.
34    ///
35    /// The `reply` sender is `Some(...)` while waiting for the proxy's `RendezvousInfo`
36    /// response, and becomes `None` once the rendezvous code has been delivered.
37    Rendezvous {
38        reply: Option<oneshot::Sender<Result<RendezvousCode, ClientError>>>,
39    },
40    /// PSK pairing — uses a real pre-shared key, no fingerprint verification needed.
41    Psk { psk: Psk, psk_id: PskId },
42}
43
44/// A pending pairing waiting for an incoming handshake.
45pub(crate) struct PendingPairing {
46    /// Friendly name to assign to the session once paired.
47    connection_name: Option<String>,
48    /// When this pairing was created (for pruning stale entries).
49    created_at: Instant,
50    /// The kind of pairing.
51    kind: PairingKind,
52}
53
54use super::notify;
55use crate::{
56    error::ClientError,
57    traits::{
58        AuditConnectionType, AuditEvent, AuditLog, CredentialFieldSet, IdentityProvider,
59        NoOpAuditLog, SessionStore,
60    },
61    types::{CredentialRequestPayload, CredentialResponsePayload, ProtocolMessage},
62};
63
64// =============================================================================
65// Public types: Notifications (fire-and-forget) and Requests (with reply)
66// =============================================================================
67
68/// Fire-and-forget status updates emitted by the user client.
69#[derive(Debug, Clone)]
70pub enum UserClientNotification {
71    /// Started listening for connections
72    Listening {},
73    /// Noise handshake started
74    HandshakeStart {},
75    /// Noise handshake progress
76    HandshakeProgress {
77        /// Progress message
78        message: String,
79    },
80    /// Noise handshake complete
81    HandshakeComplete {},
82    /// Handshake fingerprint (informational, for PSK connections)
83    HandshakeFingerprint {
84        /// The 6-character hex fingerprint for visual verification
85        fingerprint: String,
86        /// The remote device's stable identity fingerprint
87        identity: IdentityFingerprint,
88    },
89    /// Fingerprint was verified and connection accepted
90    FingerprintVerified {},
91    /// Fingerprint was rejected and connection discarded
92    FingerprintRejected {
93        /// Reason for rejection
94        reason: String,
95    },
96    /// Credential was approved and sent
97    CredentialApproved {
98        /// Domain from the matched credential
99        domain: Option<String>,
100        /// Vault item ID
101        credential_id: Option<String>,
102    },
103    /// Credential was denied
104    CredentialDenied {
105        /// Domain from the matched credential
106        domain: Option<String>,
107        /// Vault item ID
108        credential_id: Option<String>,
109    },
110    /// A known/cached device reconnected — transport keys refreshed, no re-verification needed
111    SessionRefreshed {
112        /// The remote device's identity fingerprint
113        fingerprint: IdentityFingerprint,
114    },
115    /// Client disconnected from proxy
116    ClientDisconnected {},
117    /// Attempting to reconnect to proxy
118    Reconnecting {
119        /// Current reconnection attempt number
120        attempt: u32,
121    },
122    /// Successfully reconnected to proxy
123    Reconnected {},
124    /// An error occurred
125    Error {
126        /// Error message
127        message: String,
128        /// Context where error occurred
129        context: Option<String>,
130    },
131}
132
133/// Reply for fingerprint verification requests.
134#[derive(Debug)]
135pub struct FingerprintVerificationReply {
136    /// Whether user approved the fingerprint
137    pub approved: bool,
138    /// Optional friendly name to assign to the session
139    pub name: Option<String>,
140}
141
142/// Reply for credential requests.
143#[derive(Debug)]
144pub struct CredentialRequestReply {
145    /// Whether approved
146    pub approved: bool,
147    /// The credential to send (if approved)
148    pub credential: Option<CredentialData>,
149    /// Vault item ID (for audit logging)
150    pub credential_id: Option<String>,
151}
152
153/// Requests that require a caller response, carrying a oneshot reply channel.
154#[derive(Debug)]
155pub enum UserClientRequest {
156    /// Handshake fingerprint requires verification (rendezvous connections only).
157    VerifyFingerprint {
158        /// The 6-character hex fingerprint for visual verification
159        fingerprint: String,
160        /// The remote device's stable identity fingerprint
161        identity: IdentityFingerprint,
162        /// Channel to send the verification reply
163        reply: oneshot::Sender<FingerprintVerificationReply>,
164    },
165    /// Credential request received — caller must approve/deny and provide the credential.
166    CredentialRequest {
167        /// The credential query
168        query: crate::types::CredentialQuery,
169        /// The requesting device's identity fingerprint
170        identity: IdentityFingerprint,
171        /// Channel to send the credential reply
172        reply: oneshot::Sender<CredentialRequestReply>,
173    },
174}
175
176// =============================================================================
177// Internal types for pending reply tracking
178// =============================================================================
179
180/// Resolved reply from a pending oneshot, carrying the context needed to process it.
181enum PendingReply {
182    FingerprintVerification {
183        source: IdentityFingerprint,
184        transport: MultiDeviceTransport,
185        connection_name: Option<String>,
186        reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
187    },
188    CredentialResponse {
189        source: IdentityFingerprint,
190        request_id: String,
191        query: crate::types::CredentialQuery,
192        reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
193    },
194}
195
196/// A boxed future that resolves to a `PendingReply`.
197type PendingReplyFuture = Pin<Box<dyn Future<Output = PendingReply> + Send>>;
198
199// =============================================================================
200// Command channel for UserClient handle → event loop communication
201// =============================================================================
202
203/// Commands sent from a `UserClient` handle to the running event loop.
204enum UserClientCommand {
205    /// Generate a PSK token and register a pending pairing.
206    GetPskToken {
207        name: Option<String>,
208        reply: oneshot::Sender<Result<String, ClientError>>,
209    },
210    /// Request a rendezvous code from the proxy and register a pending pairing.
211    GetRendezvousToken {
212        name: Option<String>,
213        reply: oneshot::Sender<Result<RendezvousCode, ClientError>>,
214    },
215}
216
217/// A cloneable handle for controlling the user client.
218///
219/// Obtained from [`UserClient::connect()`], which authenticates with the proxy,
220/// spawns the event loop internally, and returns this handle. All methods
221/// communicate with the event loop through an internal command channel.
222///
223/// `Clone` and `Send` — share freely across tasks and threads.
224/// Handle returned by [`UserClient::connect()`] containing the client and its
225/// notification/request channels.
226pub struct UserClientHandle {
227    pub client: UserClient,
228    pub notifications: mpsc::Receiver<UserClientNotification>,
229    pub requests: mpsc::Receiver<UserClientRequest>,
230}
231
232#[derive(Clone)]
233pub struct UserClient {
234    command_tx: mpsc::Sender<UserClientCommand>,
235}
236
237impl UserClient {
238    /// Connect to the proxy server, spawn the event loop, and return a handle.
239    ///
240    /// This is the single entry point. After `connect()` returns, the client is
241    /// already listening for incoming connections. Use `get_psk_token()` or
242    /// `get_rendezvous_token()` to set up pairings, and receive events through
243    /// the returned handle's notification/request channels.
244    ///
245    /// Pass `None` for `audit_log` to use a no-op logger.
246    pub async fn connect(
247        identity_provider: Box<dyn IdentityProvider>,
248        session_store: Box<dyn SessionStore>,
249        mut proxy_client: Box<dyn ProxyClient>,
250        audit_log: Option<Box<dyn AuditLog>>,
251    ) -> Result<UserClientHandle, ClientError> {
252        // Authenticate with the proxy (the async part — before spawn)
253        let incoming_rx = proxy_client.connect().await?;
254
255        // Create channels
256        let (notification_tx, notification_rx) = mpsc::channel(32);
257        let (request_tx, request_rx) = mpsc::channel(32);
258
259        // Create command channel
260        let (command_tx, command_rx) = mpsc::channel(32);
261
262        // Cache fingerprint before spawning (avoids repeated async calls)
263        let own_fingerprint = identity_provider.fingerprint().await;
264
265        // Build the inner state
266        let inner = UserClientInner {
267            session_store,
268            proxy_client,
269            own_fingerprint,
270            transports: HashMap::new(),
271            pending_pairings: Vec::new(),
272            audit_log: audit_log.unwrap_or_else(|| Box::new(NoOpAuditLog)),
273        };
274
275        // Spawn the event loop — use spawn_local on WASM (no Tokio runtime)
276        #[cfg(target_arch = "wasm32")]
277        wasm_bindgen_futures::spawn_local(inner.run_event_loop(
278            incoming_rx,
279            command_rx,
280            notification_tx,
281            request_tx,
282        ));
283        #[cfg(not(target_arch = "wasm32"))]
284        tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
285
286        Ok(UserClientHandle {
287            client: Self { command_tx },
288            notifications: notification_rx,
289            requests: request_rx,
290        })
291    }
292
293    /// Generate a PSK token for a new pairing.
294    ///
295    /// Returns the formatted token string (`<psk_hex>_<fingerprint_hex>`).
296    /// Multiple PSK pairings are supported concurrently (each matched by `psk_id`).
297    pub async fn get_psk_token(&self, name: Option<String>) -> Result<String, ClientError> {
298        let (tx, rx) = oneshot::channel();
299        self.command_tx
300            .send(UserClientCommand::GetPskToken { name, reply: tx })
301            .await
302            .map_err(|_| ClientError::ChannelClosed)?;
303        rx.await.map_err(|_| ClientError::ChannelClosed)?
304    }
305
306    /// Request a rendezvous code from the proxy for a new pairing.
307    ///
308    /// Only one rendezvous pairing at a time — there's no way to distinguish
309    /// incoming rendezvous handshakes.
310    pub async fn get_rendezvous_token(
311        &self,
312        name: Option<String>,
313    ) -> Result<RendezvousCode, ClientError> {
314        let (tx, rx) = oneshot::channel();
315        self.command_tx
316            .send(UserClientCommand::GetRendezvousToken { name, reply: tx })
317            .await
318            .map_err(|_| ClientError::ChannelClosed)?;
319        rx.await.map_err(|_| ClientError::ChannelClosed)?
320    }
321}
322
323// =============================================================================
324// Internal state — lives inside the spawned event loop task
325// =============================================================================
326
327/// All mutable state for the user client, owned by the spawned event loop task.
328struct UserClientInner {
329    session_store: Box<dyn SessionStore>,
330    proxy_client: Box<dyn ProxyClient>,
331    /// Our own identity fingerprint (cached at construction time).
332    own_fingerprint: IdentityFingerprint,
333    /// Map of fingerprint -> transport
334    transports: HashMap<IdentityFingerprint, MultiDeviceTransport>,
335    /// Pending pairings awaiting incoming handshakes.
336    pending_pairings: Vec<PendingPairing>,
337    /// Audit logger for security-relevant events
338    audit_log: Box<dyn AuditLog>,
339}
340
341impl UserClientInner {
342    /// Run the main event loop (consumes self).
343    async fn run_event_loop(
344        mut self,
345        mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
346        mut command_rx: mpsc::Receiver<UserClientCommand>,
347        notification_tx: mpsc::Sender<UserClientNotification>,
348        request_tx: mpsc::Sender<UserClientRequest>,
349    ) {
350        // Emit Listening notification
351        notify!(notification_tx, UserClientNotification::Listening {});
352
353        let mut pending_replies: FuturesUnordered<PendingReplyFuture> = FuturesUnordered::new();
354
355        loop {
356            tokio::select! {
357                msg = incoming_rx.recv() => {
358                    match msg {
359                        Some(msg) => {
360                            match self.handle_incoming(msg, &notification_tx, &request_tx).await {
361                                Ok(Some(fut)) => pending_replies.push(fut),
362                                Ok(None) => {}
363                                Err(e) => {
364                                    warn!("Error handling incoming message: {}", e);
365                                    notify!(notification_tx, UserClientNotification::Error {
366                                        message: e.to_string(),
367                                        context: Some("handle_incoming".to_string()),
368                                    });
369                                }
370                            }
371                        }
372                        None => {
373                            // Incoming channel closed — proxy connection lost
374                            notify!(notification_tx, UserClientNotification::ClientDisconnected {});
375                            match self.attempt_reconnection(&notification_tx).await {
376                                Ok(new_rx) => {
377                                    incoming_rx = new_rx;
378                                    notify!(notification_tx, UserClientNotification::Reconnected {});
379                                }
380                                Err(e) => {
381                                    warn!("Reconnection failed permanently: {}", e);
382                                    notify!(notification_tx, UserClientNotification::Error {
383                                        message: e.to_string(),
384                                        context: Some("reconnection".to_string()),
385                                    });
386                                    return;
387                                }
388                            }
389                        }
390                    }
391                }
392                Some(reply) = pending_replies.next() => {
393                    if let Err(e) = self.process_pending_reply(reply, &notification_tx).await {
394                        warn!("Error processing pending reply: {}", e);
395                        notify!(notification_tx, UserClientNotification::Error {
396                            message: e.to_string(),
397                            context: Some("process_pending_reply".to_string()),
398                        });
399                    }
400                }
401                cmd = command_rx.recv() => {
402                    match cmd {
403                        Some(cmd) => self.handle_command(cmd, &notification_tx).await,
404                        None => {
405                            // All handles dropped — shut down
406                            debug!("All UserClient handles dropped, shutting down event loop");
407                            return;
408                        }
409                    }
410                }
411            }
412        }
413    }
414
415    /// Attempt to reconnect to the proxy server with exponential backoff.
416    async fn attempt_reconnection(
417        &mut self,
418        notification_tx: &mpsc::Sender<UserClientNotification>,
419    ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ClientError> {
420        use rand::{Rng, SeedableRng};
421
422        let mut rng = rand::rngs::StdRng::from_entropy();
423        let mut attempt: u32 = 0;
424
425        loop {
426            attempt = attempt.saturating_add(1);
427
428            // Disconnect (ignore errors — connection may already be dead)
429            let _ = self.proxy_client.disconnect().await;
430
431            match self.proxy_client.connect().await {
432                Ok(new_rx) => {
433                    debug!("Reconnected to proxy on attempt {}", attempt);
434                    return Ok(new_rx);
435                }
436                Err(e) => {
437                    debug!("Reconnection attempt {} failed: {}", attempt, e);
438                    notify!(
439                        notification_tx,
440                        UserClientNotification::Reconnecting { attempt }
441                    );
442
443                    // Exponential backoff with jitter
444                    let exp_delay = RECONNECT_BASE_DELAY
445                        .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
446                    let delay = exp_delay.min(RECONNECT_MAX_DELAY);
447                    let jitter_max = (delay.as_millis() as u64) / 4;
448                    let jitter = if jitter_max > 0 {
449                        rng.gen_range(0..=jitter_max)
450                    } else {
451                        0
452                    };
453                    let total_delay = delay + Duration::from_millis(jitter);
454
455                    crate::compat::sleep(total_delay).await;
456                }
457            }
458        }
459    }
460
461    /// Handle incoming messages from proxy.
462    async fn handle_incoming(
463        &mut self,
464        msg: IncomingMessage,
465        notification_tx: &mpsc::Sender<UserClientNotification>,
466        request_tx: &mpsc::Sender<UserClientRequest>,
467    ) -> Result<Option<PendingReplyFuture>, ClientError> {
468        match msg {
469            IncomingMessage::Send {
470                source, payload, ..
471            } => {
472                // Parse payload as ProtocolMessage
473                let text = String::from_utf8(payload)
474                    .map_err(|e| ClientError::Serialization(format!("Invalid UTF-8: {e}")))?;
475
476                let protocol_msg: ProtocolMessage = serde_json::from_str(&text)?;
477
478                match protocol_msg {
479                    ProtocolMessage::HandshakeInit {
480                        data,
481                        ciphersuite,
482                        psk_id,
483                    } => {
484                        self.handle_handshake_init(
485                            source,
486                            data,
487                            ciphersuite,
488                            psk_id,
489                            notification_tx,
490                            request_tx,
491                        )
492                        .await
493                    }
494                    ProtocolMessage::CredentialRequest { encrypted } => {
495                        self.handle_credential_request(
496                            source,
497                            encrypted,
498                            notification_tx,
499                            request_tx,
500                        )
501                        .await
502                    }
503                    _ => {
504                        debug!("Received unexpected message type from {:?}", source);
505                        Ok(None)
506                    }
507                }
508            }
509            IncomingMessage::RendezvousInfo(code) => {
510                // Find the pending rendezvous pairing that is still awaiting a reply
511                let idx = self
512                    .pending_pairings
513                    .iter()
514                    .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }));
515
516                if let Some(idx) = idx {
517                    // Take the reply sender out, leaving the pairing in place with reply: None
518                    let pairing = &mut self.pending_pairings[idx];
519                    if let PairingKind::Rendezvous { reply } = &mut pairing.kind {
520                        if let Some(sender) = reply.take() {
521                            debug!("Completed rendezvous pairing via handle, code: {}", code);
522                            let _ = sender.send(Ok(code));
523                        }
524                    }
525                } else {
526                    debug!("Received RendezvousInfo but no pending rendezvous pairing found");
527                }
528                Ok(None)
529            }
530            IncomingMessage::IdentityInfo { .. } => {
531                // Only RemoteClient needs this
532                debug!("Received unexpected IdentityInfo message");
533                Ok(None)
534            }
535        }
536    }
537
538    /// Handle handshake init message.
539    async fn handle_handshake_init(
540        &mut self,
541        source: IdentityFingerprint,
542        data: String,
543        ciphersuite: String,
544        psk_id: Option<PskId>,
545        notification_tx: &mpsc::Sender<UserClientNotification>,
546        request_tx: &mpsc::Sender<UserClientRequest>,
547    ) -> Result<Option<PendingReplyFuture>, ClientError> {
548        debug!("Received handshake init from source: {:?}", source);
549        notify!(notification_tx, UserClientNotification::HandshakeStart {});
550
551        // Check if this is an existing/cached session (bypass pairing lookup)
552        let is_new_connection = !self.session_store.has_session(&source).await;
553
554        // Determine which PSK to use and find the matching pairing.
555        let (psk_for_handshake, matched_pairing_name, is_psk_connection) = if !is_new_connection {
556            // Existing/cached session — no pairing lookup needed
557            (None, None, false)
558        } else {
559            // New connection — look up and consume a pending pairing
560            Self::prune_stale_pairings(&mut self.pending_pairings);
561
562            match &psk_id {
563                Some(id) => {
564                    // PSK mode — find matching pairing by psk_id
565                    let idx = self.pending_pairings.iter().position(
566                        |p| matches!(&p.kind, PairingKind::Psk { psk_id: pid, .. } if pid == id),
567                    );
568                    if let Some(idx) = idx {
569                        let pairing = self.pending_pairings.remove(idx);
570                        let psk = match pairing.kind {
571                            PairingKind::Psk { psk, .. } => psk,
572                            PairingKind::Rendezvous { .. } => unreachable!(),
573                        };
574                        (Some(psk), pairing.connection_name, true)
575                    } else {
576                        warn!("No matching PSK pairing for psk_id: {}", id);
577                        return Err(ClientError::InvalidState {
578                            expected: "matching PSK pairing".to_string(),
579                            current: format!("no pairing for psk_id {id}"),
580                        });
581                    }
582                }
583                None => {
584                    // Rendezvous mode — find a confirmed rendezvous pairing
585                    // (one whose reply has already been sent, i.e. reply is None)
586                    let idx = self
587                        .pending_pairings
588                        .iter()
589                        .position(|p| matches!(p.kind, PairingKind::Rendezvous { reply: None }));
590                    let connection_name =
591                        idx.and_then(|i| self.pending_pairings.remove(i).connection_name);
592                    (None, connection_name, false)
593                }
594            }
595        };
596
597        let (transport, fingerprint_str) = self
598            .complete_handshake(source, &data, &ciphersuite, psk_for_handshake.as_ref())
599            .await?;
600
601        notify!(
602            notification_tx,
603            UserClientNotification::HandshakeComplete {}
604        );
605
606        if is_new_connection && !is_psk_connection {
607            // New rendezvous connection: require fingerprint verification.
608            let (tx, rx) = oneshot::channel();
609
610            if request_tx.capacity() == 0 {
611                warn!("Request channel full, waiting for consumer to drain");
612            }
613            request_tx
614                .send(UserClientRequest::VerifyFingerprint {
615                    fingerprint: fingerprint_str,
616                    identity: source,
617                    reply: tx,
618                })
619                .await
620                .ok();
621
622            let fut: PendingReplyFuture = Box::pin(async move {
623                let result = rx.await;
624                PendingReply::FingerprintVerification {
625                    source,
626                    transport,
627                    connection_name: matched_pairing_name,
628                    reply: result,
629                }
630            });
631
632            Ok(Some(fut))
633        } else if !is_new_connection {
634            // Existing/cached session: already verified on first connection.
635            self.transports.insert(source, transport.clone());
636            self.session_store.cache_session(source).await?;
637            self.session_store
638                .save_transport_state(&source, transport)
639                .await?;
640
641            self.audit_log
642                .write(AuditEvent::SessionRefreshed {
643                    remote_identity: &source,
644                })
645                .await;
646
647            notify!(
648                notification_tx,
649                UserClientNotification::SessionRefreshed {
650                    fingerprint: source,
651                }
652            );
653
654            Ok(None)
655        } else {
656            // PSK connection: trust established via pre-shared key, no verification needed
657            self.accept_new_connection(
658                source,
659                transport,
660                matched_pairing_name.as_deref(),
661                AuditConnectionType::Psk,
662            )
663            .await?;
664
665            // Emit fingerprint as informational notification (no reply needed)
666            notify!(
667                notification_tx,
668                UserClientNotification::HandshakeFingerprint {
669                    fingerprint: fingerprint_str,
670                    identity: source,
671                }
672            );
673
674            Ok(None)
675        }
676    }
677
678    /// Remove pending pairings older than `PENDING_PAIRING_MAX_AGE`.
679    fn prune_stale_pairings(pairings: &mut Vec<PendingPairing>) {
680        pairings.retain(|p| p.created_at.elapsed() < PENDING_PAIRING_MAX_AGE);
681    }
682
683    /// Accept a new connection: cache session, store transport, set name, and audit
684    async fn accept_new_connection(
685        &mut self,
686        fingerprint: IdentityFingerprint,
687        transport: MultiDeviceTransport,
688        session_name: Option<&str>,
689        connection_type: AuditConnectionType,
690    ) -> Result<(), ClientError> {
691        self.transports.insert(fingerprint, transport.clone());
692        self.session_store.cache_session(fingerprint).await?;
693        if let Some(name) = session_name {
694            self.session_store
695                .set_session_name(&fingerprint, name.to_owned())
696                .await?;
697        }
698        self.session_store
699            .save_transport_state(&fingerprint, transport)
700            .await?;
701
702        self.audit_log
703            .write(AuditEvent::ConnectionEstablished {
704                remote_identity: &fingerprint,
705                remote_name: session_name,
706                connection_type,
707            })
708            .await;
709
710        Ok(())
711    }
712
713    /// Handle credential request.
714    async fn handle_credential_request(
715        &mut self,
716        source: IdentityFingerprint,
717        encrypted: String,
718        notification_tx: &mpsc::Sender<UserClientNotification>,
719        request_tx: &mpsc::Sender<UserClientRequest>,
720    ) -> Result<Option<PendingReplyFuture>, ClientError> {
721        if !self.transports.contains_key(&source) {
722            debug!("Loading transport state for source: {:?}", source);
723            let session = self
724                .session_store
725                .load_transport_state(&source)
726                .await?
727                .ok_or_else(|| {
728                    ClientError::SessionCache(format!(
729                        "Missing transport state for cached session {source:?}"
730                    ))
731                })?;
732            self.transports.insert(source, session);
733        }
734
735        // Get transport for this source
736        let transport = self
737            .transports
738            .get_mut(&source)
739            .ok_or(ClientError::SecureChannelNotEstablished)?;
740
741        // Decrypt request
742        let encrypted_bytes = STANDARD
743            .decode(&encrypted)
744            .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
745
746        let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
747            .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
748
749        let decrypted = transport
750            .decrypt(&packet)
751            .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
752
753        let request: CredentialRequestPayload = serde_json::from_slice(&decrypted)?;
754
755        self.audit_log
756            .write(AuditEvent::CredentialRequested {
757                query: &request.query,
758                remote_identity: &source,
759                request_id: &request.request_id,
760            })
761            .await;
762
763        // Create oneshot channel for the reply
764        let (tx, rx) = oneshot::channel();
765
766        // Send request to caller
767        if request_tx.capacity() == 0 {
768            warn!("Request channel full, waiting for consumer to drain");
769        }
770        if request_tx
771            .send(UserClientRequest::CredentialRequest {
772                query: request.query.clone(),
773                identity: source,
774                reply: tx,
775            })
776            .await
777            .is_err()
778        {
779            // Request channel closed — caller is gone
780            warn!("Request channel closed, cannot send credential request");
781            notify!(
782                notification_tx,
783                UserClientNotification::Error {
784                    message: "Request channel closed".to_string(),
785                    context: Some("handle_credential_request".to_string()),
786                }
787            );
788            return Ok(None);
789        }
790
791        // Return future that awaits the reply
792        let request_id = request.request_id;
793        let query = request.query;
794        let fut: PendingReplyFuture = Box::pin(async move {
795            let result = rx.await;
796            PendingReply::CredentialResponse {
797                source,
798                request_id,
799                query,
800                reply: result,
801            }
802        });
803
804        Ok(Some(fut))
805    }
806
807    /// Process a resolved pending reply from the `FuturesUnordered`.
808    async fn process_pending_reply(
809        &mut self,
810        reply: PendingReply,
811        notification_tx: &mpsc::Sender<UserClientNotification>,
812    ) -> Result<(), ClientError> {
813        match reply {
814            PendingReply::FingerprintVerification {
815                source,
816                transport,
817                connection_name,
818                reply,
819            } => {
820                self.process_fingerprint_reply(
821                    source,
822                    transport,
823                    connection_name,
824                    reply,
825                    notification_tx,
826                )
827                .await
828            }
829            PendingReply::CredentialResponse {
830                source,
831                request_id,
832                query,
833                reply,
834            } => {
835                self.process_credential_reply(source, request_id, query, reply, notification_tx)
836                    .await
837            }
838        }
839    }
840
841    /// Handle a command from a `UserClient` handle.
842    async fn handle_command(
843        &mut self,
844        cmd: UserClientCommand,
845        notification_tx: &mpsc::Sender<UserClientNotification>,
846    ) {
847        match cmd {
848            UserClientCommand::GetPskToken { name, reply } => {
849                let result = self.generate_psk_token(name).await;
850                let _ = reply.send(result);
851            }
852            UserClientCommand::GetRendezvousToken { name, reply } => {
853                if let Err(e) = self.proxy_client.request_rendezvous().await {
854                    let _ = reply.send(Err(e));
855                    return;
856                }
857
858                // Prune stale pairings
859                Self::prune_stale_pairings(&mut self.pending_pairings);
860
861                // If there's already a pending rendezvous pairing awaiting its code,
862                // error the old one rather than silently overwriting it
863                if let Some(old_idx) = self
864                    .pending_pairings
865                    .iter()
866                    .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }))
867                {
868                    let old = self.pending_pairings.remove(old_idx);
869                    if let PairingKind::Rendezvous {
870                        reply: Some(old_reply),
871                    } = old.kind
872                    {
873                        warn!("Replacing existing pending rendezvous pairing");
874                        let _ = old_reply.send(Err(ClientError::InvalidState {
875                            expected: "single pending rendezvous".to_string(),
876                            current: "replaced by new rendezvous request".to_string(),
877                        }));
878                    }
879                }
880
881                // Push the new pairing immediately — reply will be completed
882                // when RendezvousInfo arrives from the proxy
883                self.pending_pairings.push(PendingPairing {
884                    connection_name: name,
885                    created_at: Instant::now(),
886                    kind: PairingKind::Rendezvous { reply: Some(reply) },
887                });
888
889                // Emit notification so the caller knows a code is being requested
890                notify!(
891                    notification_tx,
892                    UserClientNotification::HandshakeProgress {
893                        message: "Requesting rendezvous code from proxy...".to_string(),
894                    }
895                );
896            }
897        }
898    }
899
900    /// Generate a PSK token internally.
901    async fn generate_psk_token(&mut self, name: Option<String>) -> Result<String, ClientError> {
902        let psk = Psk::generate();
903        let psk_id = psk.id();
904        let token = format!("{}_{}", psk.to_hex(), hex::encode(self.own_fingerprint.0));
905
906        let pairing = PendingPairing {
907            connection_name: name,
908            created_at: Instant::now(),
909            kind: PairingKind::Psk { psk, psk_id },
910        };
911
912        Self::prune_stale_pairings(&mut self.pending_pairings);
913        self.pending_pairings.push(pairing);
914        debug!("Created PSK pairing, token generated");
915
916        Ok(token)
917    }
918
919    /// Process a fingerprint verification reply.
920    async fn process_fingerprint_reply(
921        &mut self,
922        source: IdentityFingerprint,
923        transport: MultiDeviceTransport,
924        connection_name: Option<String>,
925        reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
926        notification_tx: &mpsc::Sender<UserClientNotification>,
927    ) -> Result<(), ClientError> {
928        match reply {
929            Ok(FingerprintVerificationReply {
930                approved: true,
931                name,
932            }) => {
933                // Use the name from the reply, falling back to the pairing name
934                let session_name = name.or(connection_name);
935                self.accept_new_connection(
936                    source,
937                    transport,
938                    session_name.as_deref(),
939                    AuditConnectionType::Rendezvous,
940                )
941                .await?;
942
943                notify!(
944                    notification_tx,
945                    UserClientNotification::FingerprintVerified {}
946                );
947            }
948            Ok(FingerprintVerificationReply {
949                approved: false, ..
950            }) => {
951                self.audit_log
952                    .write(AuditEvent::ConnectionRejected {
953                        remote_identity: &source,
954                    })
955                    .await;
956
957                notify!(
958                    notification_tx,
959                    UserClientNotification::FingerprintRejected {
960                        reason: "User rejected fingerprint verification".to_string(),
961                    }
962                );
963            }
964            Err(_) => {
965                // Oneshot sender was dropped without replying — treat as rejection
966                warn!("Fingerprint verification reply channel dropped, treating as rejection");
967                self.audit_log
968                    .write(AuditEvent::ConnectionRejected {
969                        remote_identity: &source,
970                    })
971                    .await;
972
973                notify!(
974                    notification_tx,
975                    UserClientNotification::FingerprintRejected {
976                        reason: "Verification cancelled (reply dropped)".to_string(),
977                    }
978                );
979            }
980        }
981
982        Ok(())
983    }
984
985    /// Process a credential request reply.
986    #[allow(clippy::too_many_arguments)]
987    async fn process_credential_reply(
988        &mut self,
989        source: IdentityFingerprint,
990        request_id: String,
991        query: crate::types::CredentialQuery,
992        reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
993        notification_tx: &mpsc::Sender<UserClientNotification>,
994    ) -> Result<(), ClientError> {
995        let reply = match reply {
996            Ok(r) => r,
997            Err(_) => {
998                // Oneshot sender was dropped — treat as denial
999                warn!("Credential reply channel dropped, treating as denial");
1000                CredentialRequestReply {
1001                    approved: false,
1002                    credential: None,
1003                    credential_id: None,
1004                }
1005            }
1006        };
1007
1008        let transport = self
1009            .transports
1010            .get_mut(&source)
1011            .ok_or(ClientError::SecureChannelNotEstablished)?;
1012
1013        // Extract domain and audit fields before credential is moved into the response payload
1014        let domain = reply.credential.as_ref().and_then(|c| c.domain.clone());
1015        let fields = reply
1016            .credential
1017            .as_ref()
1018            .map_or_else(CredentialFieldSet::default, |c| CredentialFieldSet {
1019                has_username: c.username.is_some(),
1020                has_password: c.password.is_some(),
1021                has_totp: c.totp.is_some(),
1022                has_uri: c.uri.is_some(),
1023                has_notes: c.notes.is_some(),
1024            });
1025
1026        // Create response payload
1027        let response_payload = CredentialResponsePayload {
1028            credential: if reply.approved {
1029                reply.credential
1030            } else {
1031                None
1032            },
1033            error: if !reply.approved {
1034                Some("Request denied".to_string())
1035            } else {
1036                None
1037            },
1038            request_id: Some(request_id.clone()),
1039        };
1040
1041        // Encrypt and send
1042        let response_json = serde_json::to_string(&response_payload)?;
1043        let encrypted = transport
1044            .encrypt(response_json.as_bytes())
1045            .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
1046
1047        let msg = ProtocolMessage::CredentialResponse {
1048            encrypted: STANDARD.encode(encrypted.encode()),
1049        };
1050
1051        let msg_json = serde_json::to_string(&msg)?;
1052
1053        self.proxy_client
1054            .send_to(source, msg_json.into_bytes())
1055            .await?;
1056
1057        // Send notification and audit
1058        if reply.approved {
1059            self.audit_log
1060                .write(AuditEvent::CredentialApproved {
1061                    query: &query,
1062                    domain: domain.as_deref(),
1063                    remote_identity: &source,
1064                    request_id: &request_id,
1065                    credential_id: reply.credential_id.as_deref(),
1066                    fields,
1067                })
1068                .await;
1069
1070            notify!(
1071                notification_tx,
1072                UserClientNotification::CredentialApproved {
1073                    domain,
1074                    credential_id: reply.credential_id,
1075                }
1076            );
1077        } else {
1078            self.audit_log
1079                .write(AuditEvent::CredentialDenied {
1080                    query: &query,
1081                    domain: domain.as_deref(),
1082                    remote_identity: &source,
1083                    request_id: &request_id,
1084                    credential_id: reply.credential_id.as_deref(),
1085                })
1086                .await;
1087
1088            notify!(
1089                notification_tx,
1090                UserClientNotification::CredentialDenied {
1091                    domain,
1092                    credential_id: reply.credential_id,
1093                }
1094            );
1095        }
1096
1097        Ok(())
1098    }
1099
1100    /// Complete Noise handshake as responder
1101    async fn complete_handshake(
1102        &self,
1103        remote_fingerprint: IdentityFingerprint,
1104        handshake_data: &str,
1105        ciphersuite_str: &str,
1106        psk: Option<&Psk>,
1107    ) -> Result<(MultiDeviceTransport, String), ClientError> {
1108        // Parse ciphersuite
1109        let ciphersuite = match ciphersuite_str {
1110            s if s.contains("Kyber768") => Ciphersuite::PQNNpsk2_Kyber768_XChaCha20Poly1305,
1111            _ => Ciphersuite::ClassicalNNpsk2_25519_XChaCha20Poly1035,
1112        };
1113
1114        // Decode handshake data
1115        let init_bytes = STANDARD
1116            .decode(handshake_data)
1117            .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
1118
1119        let init_packet = ap_noise::HandshakePacket::decode(&init_bytes)
1120            .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
1121
1122        // Create responder handshake — with PSK if provided, otherwise null PSK (rendezvous)
1123        let mut handshake = if let Some(psk) = psk {
1124            ResponderHandshake::with_psk(psk.clone())
1125        } else {
1126            ResponderHandshake::new()
1127        };
1128
1129        // Process init and generate response
1130        handshake.receive_start(&init_packet)?;
1131        let response_packet = handshake.send_finish()?;
1132        let (transport, fingerprint) = handshake.finalize()?;
1133
1134        // Send response
1135        let msg = ProtocolMessage::HandshakeResponse {
1136            data: STANDARD.encode(response_packet.encode()?),
1137            ciphersuite: format!("{ciphersuite:?}"),
1138        };
1139
1140        let msg_json = serde_json::to_string(&msg)?;
1141
1142        self.proxy_client
1143            .send_to(remote_fingerprint, msg_json.into_bytes())
1144            .await?;
1145
1146        debug!("Sent handshake response to {:?}", remote_fingerprint);
1147
1148        Ok((transport, fingerprint.to_string()))
1149    }
1150}