Skip to main content

ap_client/clients/
user_client.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(target_arch = "wasm32")]
9use web_time::Instant;
10
11use ap_noise::{Ciphersuite, MultiDeviceTransport, Psk, ResponderHandshake};
12use ap_proxy_client::IncomingMessage;
13use ap_proxy_protocol::{IdentityFingerprint, IdentityKeyPair, RendezvousCode};
14use base64::{Engine, engine::general_purpose::STANDARD};
15use futures_util::StreamExt;
16use futures_util::stream::FuturesUnordered;
17use tokio::sync::oneshot;
18
19use crate::proxy::ProxyClient;
20use crate::types::{CredentialData, PskId, PskToken};
21use tokio::sync::mpsc;
22use tracing::{debug, warn};
23
24/// Base delay for reconnection backoff.
25const RECONNECT_BASE_DELAY: Duration = Duration::from_secs(2);
26/// Maximum delay between reconnection attempts (15 minutes).
27const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(15 * 60);
28/// Maximum age for pending pairings before they are pruned.
29const PENDING_PAIRING_MAX_AGE: Duration = Duration::from_secs(10 * 60);
30/// Maximum number of messages buffered per peer while awaiting fingerprint verification.
31const AWAITING_VERIFICATION_BUFFER_LIMIT: usize = 100;
32
33/// A pending PSK pairing waiting for an incoming handshake.
34struct PskPairing {
35    connection_name: Option<String>,
36    created_at: Instant,
37    psk: Psk,
38    /// If true, the pairing is not consumed on use and survives pruning.
39    reusable: bool,
40}
41
42/// A pending rendezvous pairing waiting for an incoming handshake.
43struct RendezvousPairing {
44    connection_name: Option<String>,
45    created_at: Instant,
46    /// Channel to deliver the rendezvous code — `Some` while awaiting, `None` after delivery.
47    code_tx: Option<oneshot::Sender<Result<RendezvousCode, ClientError>>>,
48}
49
50/// Manages pending pairings and verification message buffers.
51///
52/// Pairings track handshake setup (rendezvous codes, PSKs). Verification buffers
53/// hold messages from peers whose fingerprint is awaiting user approval — once
54/// approved the buffer is drained and replayed, on rejection it is discarded.
55struct PendingPairings {
56    /// PSK pairings keyed by their PskId for direct lookup.
57    psk_pairings: HashMap<PskId, PskPairing>,
58    /// At most one rendezvous pairing at a time.
59    rendezvous: Option<RendezvousPairing>,
60    /// Messages buffered per peer while awaiting fingerprint verification.
61    buffered_messages: HashMap<IdentityFingerprint, Vec<IncomingMessage>>,
62}
63
64impl PendingPairings {
65    fn new() -> Self {
66        Self {
67            psk_pairings: HashMap::new(),
68            rendezvous: None,
69            buffered_messages: HashMap::new(),
70        }
71    }
72
73    /// Remove pairings older than `PENDING_PAIRING_MAX_AGE`.
74    /// Reusable pairings are never pruned.
75    fn prune_stale(&mut self) {
76        self.psk_pairings
77            .retain(|_, p| p.reusable || p.created_at.elapsed() < PENDING_PAIRING_MAX_AGE);
78        if self
79            .rendezvous
80            .as_ref()
81            .is_some_and(|r| r.created_at.elapsed() >= PENDING_PAIRING_MAX_AGE)
82        {
83            self.rendezvous = None;
84        }
85    }
86
87    /// Take the pending rendezvous pairing, if any.
88    fn take_rendezvous(&mut self) -> Option<RendezvousPairing> {
89        self.rendezvous.take()
90    }
91
92    /// Look up a PSK pairing by id and return its data.
93    ///
94    /// Reusable pairings are left in the map; ephemeral ones are consumed.
95    fn take_psk(&mut self, id: &PskId) -> Option<(Psk, Option<String>)> {
96        let pairing = self.psk_pairings.get(id)?;
97        let psk = pairing.psk.clone();
98        let name = pairing.connection_name.clone();
99        if !pairing.reusable {
100            self.psk_pairings.remove(id);
101        }
102        Some((psk, name))
103    }
104
105    /// Start buffering messages for a source that is awaiting fingerprint verification.
106    fn prepare_buffering(&mut self, source: IdentityFingerprint) {
107        self.buffered_messages.insert(source, Vec::new());
108    }
109
110    /// Try to buffer a message for a source awaiting fingerprint verification.
111    /// Returns `None` if handled (buffered or dropped due to limit),
112    /// or `Some(msg)` if the source is not awaiting verification.
113    fn try_buffer_message(&mut self, msg: IncomingMessage) -> Option<IncomingMessage> {
114        let source = match &msg {
115            IncomingMessage::Send { source, .. } => source,
116            _ => return Some(msg),
117        };
118        if let Some(buffer) = self.buffered_messages.get_mut(source) {
119            if buffer.len() < AWAITING_VERIFICATION_BUFFER_LIMIT {
120                debug!(
121                    "Buffering message from {:?} pending fingerprint verification",
122                    source
123                );
124                buffer.push(msg);
125            } else {
126                warn!("Buffer limit reached for {:?}, dropping message", source);
127            }
128            None
129        } else {
130            Some(msg)
131        }
132    }
133
134    /// Remove and return buffered messages for a source.
135    /// Used to replay on approval or discard on rejection.
136    fn take_buffered_messages(&mut self, source: &IdentityFingerprint) -> Vec<IncomingMessage> {
137        self.buffered_messages.remove(source).unwrap_or_default()
138    }
139}
140
141use super::notify;
142use crate::{
143    error::ClientError,
144    traits::{
145        AuditConnectionType, AuditEvent, AuditLog, ConnectionInfo, ConnectionStore,
146        CredentialFieldSet, IdentityProvider, NoOpAuditLog, PskEntry, PskStore,
147    },
148    types::{CredentialRequestPayload, CredentialResponsePayload, ProtocolMessage},
149};
150
151// =============================================================================
152// Public types: Notifications (fire-and-forget) and Requests (with reply)
153// =============================================================================
154
155/// Fire-and-forget status updates emitted by the user client.
156#[derive(Debug, Clone)]
157pub enum UserClientNotification {
158    /// Started listening for connections
159    Listening {},
160    /// Noise handshake started
161    HandshakeStart {},
162    /// Noise handshake progress
163    HandshakeProgress {
164        /// Progress message
165        message: String,
166    },
167    /// Noise handshake complete
168    HandshakeComplete {},
169    /// Handshake fingerprint (informational, for PSK connections)
170    HandshakeFingerprint {
171        /// The 6-character hex fingerprint for visual verification
172        fingerprint: String,
173        /// The remote device's stable identity fingerprint
174        identity: IdentityFingerprint,
175    },
176    /// Fingerprint was verified and connection accepted
177    FingerprintVerified {},
178    /// Fingerprint was rejected and connection discarded
179    FingerprintRejected {
180        /// Reason for rejection
181        reason: String,
182    },
183    /// Credential was approved and sent
184    CredentialApproved {
185        /// Domain from the matched credential
186        domain: Option<String>,
187        /// Vault item ID
188        credential_id: Option<String>,
189    },
190    /// Credential was denied
191    CredentialDenied {
192        /// Domain from the matched credential
193        domain: Option<String>,
194        /// Vault item ID
195        credential_id: Option<String>,
196    },
197    /// A known/cached device reconnected — transport keys refreshed, no re-verification needed
198    SessionRefreshed {
199        /// The remote device's identity fingerprint
200        fingerprint: IdentityFingerprint,
201    },
202    /// Client disconnected from proxy
203    ClientDisconnected {},
204    /// Attempting to reconnect to proxy
205    Reconnecting {
206        /// Current reconnection attempt number
207        attempt: u32,
208    },
209    /// Successfully reconnected to proxy
210    Reconnected {},
211    /// An error occurred
212    Error {
213        /// Error message
214        message: String,
215        /// Context where error occurred
216        context: Option<String>,
217    },
218}
219
220/// Reply for fingerprint verification requests.
221#[derive(Debug)]
222pub struct FingerprintVerificationReply {
223    /// Whether user approved the fingerprint
224    pub approved: bool,
225    /// Optional friendly name to assign to the session
226    pub name: Option<String>,
227}
228
229/// Reply for credential requests.
230#[derive(Debug)]
231pub struct CredentialRequestReply {
232    /// Whether approved
233    pub approved: bool,
234    /// The credential to send (if approved)
235    pub credential: Option<CredentialData>,
236    /// Vault item ID (for audit logging)
237    pub credential_id: Option<String>,
238}
239
240/// Requests that require a caller response, carrying a oneshot reply channel.
241#[derive(Debug)]
242pub enum UserClientRequest {
243    /// Handshake fingerprint requires verification (rendezvous connections only).
244    VerifyFingerprint {
245        /// The 6-character hex fingerprint for visual verification
246        fingerprint: String,
247        /// The remote device's stable identity fingerprint
248        identity: IdentityFingerprint,
249        /// Channel to send the verification reply
250        reply: oneshot::Sender<FingerprintVerificationReply>,
251    },
252    /// Credential request received — caller must approve/deny and provide the credential.
253    CredentialRequest {
254        /// The credential query
255        query: crate::types::CredentialQuery,
256        /// The requesting device's identity fingerprint
257        identity: IdentityFingerprint,
258        /// Channel to send the credential reply
259        reply: oneshot::Sender<CredentialRequestReply>,
260    },
261}
262
263// =============================================================================
264// Internal types for pending reply tracking
265// =============================================================================
266
267/// Resolved reply from a pending oneshot, carrying the context needed to process it.
268enum PendingReply {
269    FingerprintVerification {
270        source: IdentityFingerprint,
271        transport: MultiDeviceTransport,
272        connection_name: Option<String>,
273        reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
274    },
275    CredentialResponse {
276        source: IdentityFingerprint,
277        request_id: String,
278        query: crate::types::CredentialQuery,
279        reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
280    },
281}
282
283/// A boxed future that resolves to a `PendingReply`.
284type PendingReplyFuture = Pin<Box<dyn Future<Output = PendingReply> + Send>>;
285
286// =============================================================================
287// Command channel for UserClient handle → event loop communication
288// =============================================================================
289
290/// Commands sent from a `UserClient` handle to the running event loop.
291enum UserClientCommand {
292    /// Generate a PSK token and register a pending pairing.
293    GetPskToken {
294        name: Option<String>,
295        reusable: bool,
296        reply: oneshot::Sender<Result<String, ClientError>>,
297    },
298    /// Request a rendezvous code from the proxy and register a pending pairing.
299    GetRendezvousToken {
300        name: Option<String>,
301        reply: oneshot::Sender<Result<RendezvousCode, ClientError>>,
302    },
303}
304
305/// A cloneable handle for controlling the user client.
306///
307/// Obtained from [`UserClient::connect()`], which authenticates with the proxy,
308/// spawns the event loop internally, and returns this handle. All methods
309/// communicate with the event loop through an internal command channel.
310///
311/// `Clone` and `Send` — share freely across tasks and threads.
312/// Handle returned by [`UserClient::connect()`] containing the client and its
313/// notification/request channels.
314pub struct UserClientHandle {
315    pub client: UserClient,
316    pub notifications: mpsc::Receiver<UserClientNotification>,
317    pub requests: mpsc::Receiver<UserClientRequest>,
318}
319
320#[derive(Clone)]
321pub struct UserClient {
322    command_tx: mpsc::Sender<UserClientCommand>,
323}
324
325impl UserClient {
326    /// Connect to the proxy server, spawn the event loop, and return a handle.
327    ///
328    /// This is the single entry point. After `connect()` returns, the client is
329    /// already listening for incoming connections. Use `get_psk_token()` or
330    /// `get_rendezvous_token()` to set up pairings, and receive events through
331    /// the returned handle's notification/request channels.
332    ///
333    /// Pass `None` for `audit_log` to use a no-op logger.
334    /// Pass `Some(store)` for `psk_store` to enable reusable PSK support.
335    /// Stored PSKs are loaded into pending pairings on startup.
336    pub async fn connect(
337        identity_provider: Box<dyn IdentityProvider>,
338        connection_store: Box<dyn ConnectionStore>,
339        mut proxy_client: Box<dyn ProxyClient>,
340        audit_log: Option<Box<dyn AuditLog>>,
341        psk_store: Option<Box<dyn PskStore>>,
342    ) -> Result<UserClientHandle, ClientError> {
343        // Extract identity once — used for proxy auth, reconnection, and own fingerprint
344        let identity_keypair = identity_provider.identity().await;
345        let own_fingerprint = identity_keypair.identity().fingerprint();
346
347        // Authenticate with the proxy (the async part — before spawn)
348        let incoming_rx = proxy_client.connect(identity_keypair.clone()).await?;
349
350        // Create channels
351        let (notification_tx, notification_rx) = mpsc::channel(32);
352        let (request_tx, request_rx) = mpsc::channel(32);
353
354        // Create command channel
355        let (command_tx, command_rx) = mpsc::channel(32);
356
357        // Build the inner state
358        let mut pending_pairings = PendingPairings::new();
359
360        // Load stored reusable PSKs into pending pairings
361        if let Some(ref store) = psk_store {
362            for entry in store.list().await {
363                pending_pairings.psk_pairings.insert(
364                    entry.psk_id.clone(),
365                    PskPairing {
366                        connection_name: entry.name.clone(),
367                        created_at: Instant::now(),
368                        psk: entry.psk.clone(),
369                        reusable: true,
370                    },
371                );
372                debug!("Loaded reusable PSK from store: psk_id={}", entry.psk_id);
373            }
374        }
375
376        let inner = UserClientInner {
377            connection_store,
378            proxy_client,
379            identity_keypair,
380            own_fingerprint,
381            transports: HashMap::new(),
382            pending_pairings,
383            audit_log: audit_log.unwrap_or_else(|| Box::new(NoOpAuditLog)),
384            psk_store,
385        };
386
387        // Spawn the event loop — use spawn_local on WASM (no Tokio runtime)
388        #[cfg(target_arch = "wasm32")]
389        wasm_bindgen_futures::spawn_local(inner.run_event_loop(
390            incoming_rx,
391            command_rx,
392            notification_tx,
393            request_tx,
394        ));
395        #[cfg(not(target_arch = "wasm32"))]
396        tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
397
398        Ok(UserClientHandle {
399            client: Self { command_tx },
400            notifications: notification_rx,
401            requests: request_rx,
402        })
403    }
404
405    /// Generate a PSK token for a new pairing.
406    ///
407    /// Returns the formatted token string (`<psk_hex>_<fingerprint_hex>`).
408    /// Multiple PSK pairings are supported concurrently (each matched by `psk_id`).
409    ///
410    /// When `reusable` is true, the PSK is saved to the configured [`PskStore`]
411    /// and will not be consumed on first use. Requires a `PskStore` to have been
412    /// passed to [`UserClient::connect()`].
413    pub async fn get_psk_token(
414        &self,
415        name: Option<String>,
416        reusable: bool,
417    ) -> Result<String, ClientError> {
418        let (tx, rx) = oneshot::channel();
419        self.command_tx
420            .send(UserClientCommand::GetPskToken {
421                name,
422                reusable,
423                reply: tx,
424            })
425            .await
426            .map_err(|_| ClientError::ChannelClosed)?;
427        rx.await.map_err(|_| ClientError::ChannelClosed)?
428    }
429
430    /// Request a rendezvous code from the proxy for a new pairing.
431    ///
432    /// Only one rendezvous pairing at a time — there's no way to distinguish
433    /// incoming rendezvous handshakes.
434    pub async fn get_rendezvous_token(
435        &self,
436        name: Option<String>,
437    ) -> Result<RendezvousCode, ClientError> {
438        let (tx, rx) = oneshot::channel();
439        self.command_tx
440            .send(UserClientCommand::GetRendezvousToken { name, reply: tx })
441            .await
442            .map_err(|_| ClientError::ChannelClosed)?;
443        rx.await.map_err(|_| ClientError::ChannelClosed)?
444    }
445}
446
447// =============================================================================
448// Internal state — lives inside the spawned event loop task
449// =============================================================================
450
451/// All mutable state for the user client, owned by the spawned event loop task.
452struct UserClientInner {
453    connection_store: Box<dyn ConnectionStore>,
454    proxy_client: Box<dyn ProxyClient>,
455    /// Our identity keypair (needed for reconnection).
456    identity_keypair: IdentityKeyPair,
457    /// Our own identity fingerprint (cached at construction time).
458    own_fingerprint: IdentityFingerprint,
459    /// Map of fingerprint -> transport
460    transports: HashMap<IdentityFingerprint, MultiDeviceTransport>,
461    /// Pending pairings and verification message buffers.
462    pending_pairings: PendingPairings,
463    /// Audit logger for security-relevant events
464    audit_log: Box<dyn AuditLog>,
465    /// Optional persistent store for reusable PSKs.
466    psk_store: Option<Box<dyn PskStore>>,
467}
468
469impl UserClientInner {
470    /// Run the main event loop (consumes self).
471    async fn run_event_loop(
472        mut self,
473        mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
474        mut command_rx: mpsc::Receiver<UserClientCommand>,
475        notification_tx: mpsc::Sender<UserClientNotification>,
476        request_tx: mpsc::Sender<UserClientRequest>,
477    ) {
478        // Emit Listening notification
479        notify!(notification_tx, UserClientNotification::Listening {});
480
481        let mut pending_replies: FuturesUnordered<PendingReplyFuture> = FuturesUnordered::new();
482
483        loop {
484            tokio::select! {
485                msg = incoming_rx.recv() => {
486                    match msg {
487                        Some(msg) => {
488                            match self.handle_incoming(msg, &notification_tx, &request_tx).await {
489                                Ok(Some(fut)) => pending_replies.push(fut),
490                                Ok(None) => {}
491                                Err(e) => {
492                                    warn!("Error handling incoming message: {}", e);
493                                    notify!(notification_tx, UserClientNotification::Error {
494                                        message: e.to_string(),
495                                        context: Some("handle_incoming".to_string()),
496                                    });
497                                }
498                            }
499                        }
500                        None => {
501                            // Incoming channel closed — proxy connection lost
502                            notify!(notification_tx, UserClientNotification::ClientDisconnected {});
503                            match self.attempt_reconnection(&notification_tx).await {
504                                Ok(new_rx) => {
505                                    incoming_rx = new_rx;
506                                    notify!(notification_tx, UserClientNotification::Reconnected {});
507                                }
508                                Err(e) => {
509                                    warn!("Reconnection failed permanently: {}", e);
510                                    notify!(notification_tx, UserClientNotification::Error {
511                                        message: e.to_string(),
512                                        context: Some("reconnection".to_string()),
513                                    });
514                                    return;
515                                }
516                            }
517                        }
518                    }
519                }
520                Some(reply) = pending_replies.next() => {
521                    match self.process_pending_reply(reply, &notification_tx, &request_tx).await {
522                        Ok(futs) => {
523                            for fut in futs {
524                                pending_replies.push(fut);
525                            }
526                        }
527                        Err(e) => {
528                            warn!("Error processing pending reply: {}", e);
529                            notify!(notification_tx, UserClientNotification::Error {
530                                message: e.to_string(),
531                                context: Some("process_pending_reply".to_string()),
532                            });
533                        }
534                    }
535                }
536                cmd = command_rx.recv() => {
537                    match cmd {
538                        Some(cmd) => self.handle_command(cmd, &notification_tx).await,
539                        None => {
540                            // All handles dropped — shut down
541                            debug!("All UserClient handles dropped, shutting down event loop");
542                            return;
543                        }
544                    }
545                }
546            }
547        }
548    }
549
550    /// Attempt to reconnect to the proxy server with exponential backoff.
551    async fn attempt_reconnection(
552        &mut self,
553        notification_tx: &mpsc::Sender<UserClientNotification>,
554    ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ClientError> {
555        use rand::{Rng, SeedableRng};
556
557        let mut rng = rand::rngs::StdRng::from_entropy();
558        let mut attempt: u32 = 0;
559
560        loop {
561            attempt = attempt.saturating_add(1);
562
563            // Disconnect (ignore errors — connection may already be dead)
564            let _ = self.proxy_client.disconnect().await;
565
566            match self
567                .proxy_client
568                .connect(self.identity_keypair.clone())
569                .await
570            {
571                Ok(new_rx) => {
572                    debug!("Reconnected to proxy on attempt {}", attempt);
573                    return Ok(new_rx);
574                }
575                Err(e) => {
576                    debug!("Reconnection attempt {} failed: {}", attempt, e);
577                    notify!(
578                        notification_tx,
579                        UserClientNotification::Reconnecting { attempt }
580                    );
581
582                    // Exponential backoff with jitter
583                    let exp_delay = RECONNECT_BASE_DELAY
584                        .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
585                    let delay = exp_delay.min(RECONNECT_MAX_DELAY);
586                    let jitter_max = (delay.as_millis() as u64) / 4;
587                    let jitter = if jitter_max > 0 {
588                        rng.gen_range(0..=jitter_max)
589                    } else {
590                        0
591                    };
592                    let total_delay = delay + Duration::from_millis(jitter);
593
594                    crate::compat::sleep(total_delay).await;
595                }
596            }
597        }
598    }
599
600    /// Handle incoming messages from proxy.
601    async fn handle_incoming(
602        &mut self,
603        msg: IncomingMessage,
604        notification_tx: &mpsc::Sender<UserClientNotification>,
605        request_tx: &mpsc::Sender<UserClientRequest>,
606    ) -> Result<Option<PendingReplyFuture>, ClientError> {
607        // If this source is awaiting fingerprint verification, buffer the message
608        let Some(msg) = self.pending_pairings.try_buffer_message(msg) else {
609            return Ok(None);
610        };
611
612        match msg {
613            IncomingMessage::Send {
614                source, payload, ..
615            } => {
616                let text = String::from_utf8(payload)
617                    .map_err(|e| ClientError::Serialization(format!("Invalid UTF-8: {e}")))?;
618
619                let protocol_msg: ProtocolMessage = serde_json::from_str(&text)?;
620
621                match protocol_msg {
622                    ProtocolMessage::HandshakeInit {
623                        data,
624                        ciphersuite,
625                        psk_id,
626                    } => {
627                        self.handle_handshake_init(
628                            source,
629                            data,
630                            ciphersuite,
631                            psk_id,
632                            notification_tx,
633                            request_tx,
634                        )
635                        .await
636                    }
637                    ProtocolMessage::CredentialRequest { encrypted } => {
638                        self.handle_credential_request(
639                            source,
640                            encrypted,
641                            notification_tx,
642                            request_tx,
643                        )
644                        .await
645                    }
646                    _ => {
647                        debug!("Received unexpected message type from {:?}", source);
648                        Ok(None)
649                    }
650                }
651            }
652            IncomingMessage::RendezvousInfo(code) => {
653                if let Some(pairing) = &mut self.pending_pairings.rendezvous {
654                    if let Some(sender) = pairing.code_tx.take() {
655                        debug!("Completed rendezvous pairing via handle, code: {}", code);
656                        let _ = sender.send(Ok(code));
657                    }
658                } else {
659                    debug!("Received RendezvousInfo but no pending rendezvous pairing found");
660                }
661                Ok(None)
662            }
663            IncomingMessage::IdentityInfo { .. } => {
664                // Only RemoteClient needs this
665                debug!("Received unexpected IdentityInfo message");
666                Ok(None)
667            }
668        }
669    }
670
671    /// Handle handshake init message.
672    async fn handle_handshake_init(
673        &mut self,
674        source: IdentityFingerprint,
675        data: String,
676        ciphersuite: String,
677        psk_id: Option<PskId>,
678        notification_tx: &mpsc::Sender<UserClientNotification>,
679        request_tx: &mpsc::Sender<UserClientRequest>,
680    ) -> Result<Option<PendingReplyFuture>, ClientError> {
681        debug!("Received handshake init from source: {:?}", source);
682        notify!(notification_tx, UserClientNotification::HandshakeStart {});
683
684        // Check if this is an existing/cached connection (bypass pairing lookup)
685        let is_new_connection = self.connection_store.get(&source).await.is_none();
686
687        // Determine which PSK to use and find the matching pairing.
688        let (psk_for_handshake, matched_pairing_name, is_psk_connection) = if !is_new_connection {
689            // Existing/cached session — if the remote sent a psk_id that matches a
690            // pending pairing, use it for the handshake. This is required for reusable
691            // PSKs where the remote reconnects with the same token.
692            match psk_id
693                .as_ref()
694                .and_then(|id| self.pending_pairings.take_psk(id))
695            {
696                Some((psk, name)) => (Some(psk), name, true),
697                None => (None, None, false),
698            }
699        } else {
700            // New connection — look up and consume a pending pairing
701            self.pending_pairings.prune_stale();
702
703            match &psk_id {
704                Some(id) => {
705                    // PSK mode — find matching pairing by psk_id
706                    if let Some((psk, name)) = self.pending_pairings.take_psk(id) {
707                        (Some(psk), name, true)
708                    } else {
709                        warn!("No matching PSK pairing for psk_id: {}", id);
710                        return Err(ClientError::InvalidState {
711                            expected: "matching PSK pairing".to_string(),
712                            current: format!("no pairing for psk_id {id}"),
713                        });
714                    }
715                }
716                None => {
717                    // Rendezvous mode — take the pending rendezvous pairing
718                    if let Some(pairing) = self.pending_pairings.take_rendezvous() {
719                        (None, pairing.connection_name, false)
720                    } else {
721                        return Err(ClientError::InvalidState {
722                            expected: "pending rendezvous pairing".to_string(),
723                            current: "no pending rendezvous pairing".to_string(),
724                        });
725                    }
726                }
727            }
728        };
729
730        let (transport, fingerprint_str) = self
731            .complete_handshake(source, &data, &ciphersuite, psk_for_handshake.as_ref())
732            .await?;
733
734        notify!(
735            notification_tx,
736            UserClientNotification::HandshakeComplete {}
737        );
738
739        if is_new_connection && !is_psk_connection {
740            // New rendezvous connection: require fingerprint verification.
741            // Buffer messages from this source until verification completes.
742            self.pending_pairings.prepare_buffering(source);
743
744            let (tx, rx) = oneshot::channel();
745
746            if request_tx.capacity() == 0 {
747                warn!("Request channel full, waiting for consumer to drain");
748            }
749            request_tx
750                .send(UserClientRequest::VerifyFingerprint {
751                    fingerprint: fingerprint_str,
752                    identity: source,
753                    reply: tx,
754                })
755                .await
756                .ok();
757
758            let fut: PendingReplyFuture = Box::pin(async move {
759                let result = rx.await;
760                PendingReply::FingerprintVerification {
761                    source,
762                    transport,
763                    connection_name: matched_pairing_name,
764                    reply: result,
765                }
766            });
767
768            Ok(Some(fut))
769        } else if !is_new_connection {
770            // Existing/cached connection: already verified on first connection.
771            // Get existing connection to preserve name and cached_at, update transport.
772            let existing = self.connection_store.get(&source).await;
773            let now = crate::compat::now_seconds();
774            self.transports.insert(source, transport.clone());
775            self.connection_store
776                .save(ConnectionInfo {
777                    fingerprint: source,
778                    name: existing.as_ref().and_then(|s| s.name.clone()),
779                    cached_at: existing.as_ref().map_or(now, |s| s.cached_at),
780                    last_connected_at: now,
781                    transport_state: Some(transport),
782                })
783                .await?;
784
785            self.audit_log
786                .write(AuditEvent::SessionRefreshed {
787                    remote_identity: &source,
788                })
789                .await;
790
791            notify!(
792                notification_tx,
793                UserClientNotification::SessionRefreshed {
794                    fingerprint: source,
795                }
796            );
797
798            Ok(None)
799        } else {
800            // PSK connection: trust established via pre-shared key, no verification needed
801            self.accept_new_connection(
802                source,
803                transport,
804                matched_pairing_name.as_deref(),
805                AuditConnectionType::Psk,
806            )
807            .await?;
808
809            // Emit fingerprint as informational notification (no reply needed)
810            notify!(
811                notification_tx,
812                UserClientNotification::HandshakeFingerprint {
813                    fingerprint: fingerprint_str,
814                    identity: source,
815                }
816            );
817
818            Ok(None)
819        }
820    }
821
822    /// Accept a new connection: cache connection, store transport, set name, and audit
823    async fn accept_new_connection(
824        &mut self,
825        fingerprint: IdentityFingerprint,
826        transport: MultiDeviceTransport,
827        connection_name: Option<&str>,
828        connection_type: AuditConnectionType,
829    ) -> Result<(), ClientError> {
830        let now = crate::compat::now_seconds();
831        self.transports.insert(fingerprint, transport.clone());
832        self.connection_store
833            .save(ConnectionInfo {
834                fingerprint,
835                name: connection_name.map(|s| s.to_owned()),
836                cached_at: now,
837                last_connected_at: now,
838                transport_state: Some(transport),
839            })
840            .await?;
841
842        self.audit_log
843            .write(AuditEvent::ConnectionEstablished {
844                remote_identity: &fingerprint,
845                remote_name: connection_name,
846                connection_type,
847            })
848            .await;
849
850        Ok(())
851    }
852
853    /// Handle credential request.
854    async fn handle_credential_request(
855        &mut self,
856        source: IdentityFingerprint,
857        encrypted: String,
858        notification_tx: &mpsc::Sender<UserClientNotification>,
859        request_tx: &mpsc::Sender<UserClientRequest>,
860    ) -> Result<Option<PendingReplyFuture>, ClientError> {
861        if !self.transports.contains_key(&source) {
862            debug!("Loading transport state for source: {:?}", source);
863            let connection = self.connection_store.get(&source).await.ok_or_else(|| {
864                ClientError::ConnectionCache(format!("Missing cached connection {source:?}"))
865            })?;
866            let transport = connection.transport_state.ok_or_else(|| {
867                ClientError::ConnectionCache(format!(
868                    "Missing transport state for cached connection {source:?}"
869                ))
870            })?;
871            self.transports.insert(source, transport);
872        }
873
874        // Get transport for this source
875        let transport = self
876            .transports
877            .get_mut(&source)
878            .ok_or(ClientError::SecureChannelNotEstablished)?;
879
880        // Decrypt request
881        let encrypted_bytes = STANDARD
882            .decode(&encrypted)
883            .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
884
885        let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
886            .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
887
888        let decrypted = transport
889            .decrypt(&packet)
890            .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
891
892        let request: CredentialRequestPayload = serde_json::from_slice(&decrypted)?;
893
894        self.audit_log
895            .write(AuditEvent::CredentialRequested {
896                query: &request.query,
897                remote_identity: &source,
898                request_id: &request.request_id,
899            })
900            .await;
901
902        // Create oneshot channel for the reply
903        let (tx, rx) = oneshot::channel();
904
905        // Send request to caller
906        if request_tx.capacity() == 0 {
907            warn!("Request channel full, waiting for consumer to drain");
908        }
909        if request_tx
910            .send(UserClientRequest::CredentialRequest {
911                query: request.query.clone(),
912                identity: source,
913                reply: tx,
914            })
915            .await
916            .is_err()
917        {
918            // Request channel closed — caller is gone
919            warn!("Request channel closed, cannot send credential request");
920            notify!(
921                notification_tx,
922                UserClientNotification::Error {
923                    message: "Request channel closed".to_string(),
924                    context: Some("handle_credential_request".to_string()),
925                }
926            );
927            return Ok(None);
928        }
929
930        // Return future that awaits the reply
931        let request_id = request.request_id;
932        let query = request.query;
933        let fut: PendingReplyFuture = Box::pin(async move {
934            let result = rx.await;
935            PendingReply::CredentialResponse {
936                source,
937                request_id,
938                query,
939                reply: result,
940            }
941        });
942
943        Ok(Some(fut))
944    }
945
946    /// Process a resolved pending reply from the `FuturesUnordered`.
947    async fn process_pending_reply(
948        &mut self,
949        reply: PendingReply,
950        notification_tx: &mpsc::Sender<UserClientNotification>,
951        request_tx: &mpsc::Sender<UserClientRequest>,
952    ) -> Result<Vec<PendingReplyFuture>, ClientError> {
953        match reply {
954            PendingReply::FingerprintVerification {
955                source,
956                transport,
957                connection_name,
958                reply,
959            } => {
960                self.process_fingerprint_reply(
961                    source,
962                    transport,
963                    connection_name,
964                    reply,
965                    notification_tx,
966                    request_tx,
967                )
968                .await
969            }
970            PendingReply::CredentialResponse {
971                source,
972                request_id,
973                query,
974                reply,
975            } => {
976                self.process_credential_reply(source, request_id, query, reply, notification_tx)
977                    .await?;
978                Ok(Vec::new())
979            }
980        }
981    }
982
983    /// Handle a command from a `UserClient` handle.
984    async fn handle_command(
985        &mut self,
986        cmd: UserClientCommand,
987        notification_tx: &mpsc::Sender<UserClientNotification>,
988    ) {
989        match cmd {
990            UserClientCommand::GetPskToken {
991                name,
992                reusable,
993                reply,
994            } => {
995                let result = self.generate_psk_token(name, reusable).await;
996                let _ = reply.send(result);
997            }
998            UserClientCommand::GetRendezvousToken { name, reply } => {
999                if let Err(e) = self.proxy_client.request_rendezvous().await {
1000                    let _ = reply.send(Err(e));
1001                    return;
1002                }
1003
1004                // Prune stale pairings
1005                self.pending_pairings.prune_stale();
1006
1007                // Replace any existing rendezvous pairing — the old sender drops,
1008                // causing the receiver to get a RecvError (maps to ChannelClosed)
1009                self.pending_pairings.rendezvous = Some(RendezvousPairing {
1010                    connection_name: name,
1011                    created_at: Instant::now(),
1012                    code_tx: Some(reply),
1013                });
1014
1015                // Emit notification so the caller knows a code is being requested
1016                notify!(
1017                    notification_tx,
1018                    UserClientNotification::HandshakeProgress {
1019                        message: "Requesting rendezvous code from proxy...".to_string(),
1020                    }
1021                );
1022            }
1023        }
1024    }
1025
1026    /// Generate a PSK token internally.
1027    async fn generate_psk_token(
1028        &mut self,
1029        name: Option<String>,
1030        reusable: bool,
1031    ) -> Result<String, ClientError> {
1032        if reusable && self.psk_store.is_none() {
1033            return Err(ClientError::InvalidState {
1034                expected: "PskStore configured".to_string(),
1035                current: "no PskStore provided".to_string(),
1036            });
1037        }
1038
1039        let psk = Psk::generate();
1040        let psk_id = psk.id();
1041        let token = PskToken::new(psk.clone(), self.own_fingerprint).to_string();
1042
1043        self.pending_pairings.prune_stale();
1044        self.pending_pairings.psk_pairings.insert(
1045            psk_id.clone(),
1046            PskPairing {
1047                connection_name: name.clone(),
1048                created_at: Instant::now(),
1049                psk: psk.clone(),
1050                reusable,
1051            },
1052        );
1053
1054        if reusable {
1055            if let Some(store) = &mut self.psk_store {
1056                store
1057                    .save(PskEntry {
1058                        psk_id,
1059                        psk,
1060                        name,
1061                        created_at: crate::compat::now_seconds(),
1062                    })
1063                    .await?;
1064                debug!("Created reusable PSK pairing, token generated and persisted");
1065            }
1066        } else {
1067            debug!("Created ephemeral PSK pairing, token generated");
1068        }
1069
1070        Ok(token)
1071    }
1072
1073    /// Process a fingerprint verification reply.
1074    async fn process_fingerprint_reply(
1075        &mut self,
1076        source: IdentityFingerprint,
1077        transport: MultiDeviceTransport,
1078        connection_name: Option<String>,
1079        reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
1080        notification_tx: &mpsc::Sender<UserClientNotification>,
1081        request_tx: &mpsc::Sender<UserClientRequest>,
1082    ) -> Result<Vec<PendingReplyFuture>, ClientError> {
1083        match reply {
1084            Ok(FingerprintVerificationReply {
1085                approved: true,
1086                name,
1087            }) => {
1088                // Use the name from the reply, falling back to the pairing name
1089                let conn_name = name.or(connection_name);
1090                self.accept_new_connection(
1091                    source,
1092                    transport,
1093                    conn_name.as_deref(),
1094                    AuditConnectionType::Rendezvous,
1095                )
1096                .await?;
1097
1098                notify!(
1099                    notification_tx,
1100                    UserClientNotification::FingerprintVerified {}
1101                );
1102
1103                // Drain and replay buffered messages
1104                let mut futures = Vec::new();
1105                for msg in self.pending_pairings.take_buffered_messages(&source) {
1106                    match self.handle_incoming(msg, notification_tx, request_tx).await {
1107                        Ok(Some(fut)) => futures.push(fut),
1108                        Ok(None) => {}
1109                        Err(e) => {
1110                            warn!("Error processing buffered message: {}", e);
1111                        }
1112                    }
1113                }
1114
1115                Ok(futures)
1116            }
1117            Ok(FingerprintVerificationReply {
1118                approved: false, ..
1119            }) => {
1120                self.reject_fingerprint(
1121                    &source,
1122                    "User rejected fingerprint verification",
1123                    notification_tx,
1124                )
1125                .await;
1126                Ok(Vec::new())
1127            }
1128            Err(_) => {
1129                warn!("Fingerprint verification reply channel dropped, treating as rejection");
1130                self.reject_fingerprint(
1131                    &source,
1132                    "Verification cancelled (reply dropped)",
1133                    notification_tx,
1134                )
1135                .await;
1136                Ok(Vec::new())
1137            }
1138        }
1139    }
1140
1141    /// Reject a fingerprint verification: discard buffered messages, audit, and notify.
1142    async fn reject_fingerprint(
1143        &mut self,
1144        source: &IdentityFingerprint,
1145        reason: &str,
1146        notification_tx: &mpsc::Sender<UserClientNotification>,
1147    ) {
1148        self.pending_pairings.take_buffered_messages(source);
1149
1150        self.audit_log
1151            .write(AuditEvent::ConnectionRejected {
1152                remote_identity: source,
1153            })
1154            .await;
1155
1156        notify!(
1157            notification_tx,
1158            UserClientNotification::FingerprintRejected {
1159                reason: reason.to_string(),
1160            }
1161        );
1162    }
1163
1164    /// Process a credential request reply.
1165    #[allow(clippy::too_many_arguments)]
1166    async fn process_credential_reply(
1167        &mut self,
1168        source: IdentityFingerprint,
1169        request_id: String,
1170        query: crate::types::CredentialQuery,
1171        reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
1172        notification_tx: &mpsc::Sender<UserClientNotification>,
1173    ) -> Result<(), ClientError> {
1174        let reply = match reply {
1175            Ok(r) => r,
1176            Err(_) => {
1177                // Oneshot sender was dropped — treat as denial
1178                warn!("Credential reply channel dropped, treating as denial");
1179                CredentialRequestReply {
1180                    approved: false,
1181                    credential: None,
1182                    credential_id: None,
1183                }
1184            }
1185        };
1186
1187        let transport = self
1188            .transports
1189            .get_mut(&source)
1190            .ok_or(ClientError::SecureChannelNotEstablished)?;
1191
1192        // Extract domain and audit fields before credential is moved into the response payload
1193        let domain = reply.credential.as_ref().and_then(|c| c.domain.clone());
1194        let fields = reply
1195            .credential
1196            .as_ref()
1197            .map_or_else(CredentialFieldSet::default, |c| CredentialFieldSet {
1198                has_username: c.username.is_some(),
1199                has_password: c.password.is_some(),
1200                has_totp: c.totp.is_some(),
1201                has_uri: c.uri.is_some(),
1202                has_notes: c.notes.is_some(),
1203            });
1204
1205        // Create response payload
1206        let response_payload = CredentialResponsePayload {
1207            credential: if reply.approved {
1208                reply.credential
1209            } else {
1210                None
1211            },
1212            error: if !reply.approved {
1213                Some("Request denied".to_string())
1214            } else {
1215                None
1216            },
1217            request_id: Some(request_id.clone()),
1218        };
1219
1220        // Encrypt and send
1221        let response_json = serde_json::to_string(&response_payload)?;
1222        let encrypted = transport
1223            .encrypt(response_json.as_bytes())
1224            .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
1225
1226        let msg = ProtocolMessage::CredentialResponse {
1227            encrypted: STANDARD.encode(encrypted.encode()),
1228        };
1229
1230        let msg_json = serde_json::to_string(&msg)?;
1231
1232        self.proxy_client
1233            .send_to(source, msg_json.into_bytes())
1234            .await?;
1235
1236        // Send notification and audit
1237        if reply.approved {
1238            self.audit_log
1239                .write(AuditEvent::CredentialApproved {
1240                    query: &query,
1241                    domain: domain.as_deref(),
1242                    remote_identity: &source,
1243                    request_id: &request_id,
1244                    credential_id: reply.credential_id.as_deref(),
1245                    fields,
1246                })
1247                .await;
1248
1249            notify!(
1250                notification_tx,
1251                UserClientNotification::CredentialApproved {
1252                    domain,
1253                    credential_id: reply.credential_id,
1254                }
1255            );
1256        } else {
1257            self.audit_log
1258                .write(AuditEvent::CredentialDenied {
1259                    query: &query,
1260                    domain: domain.as_deref(),
1261                    remote_identity: &source,
1262                    request_id: &request_id,
1263                    credential_id: reply.credential_id.as_deref(),
1264                })
1265                .await;
1266
1267            notify!(
1268                notification_tx,
1269                UserClientNotification::CredentialDenied {
1270                    domain,
1271                    credential_id: reply.credential_id,
1272                }
1273            );
1274        }
1275
1276        Ok(())
1277    }
1278
1279    /// Complete Noise handshake as responder
1280    async fn complete_handshake(
1281        &self,
1282        remote_fingerprint: IdentityFingerprint,
1283        handshake_data: &str,
1284        ciphersuite_str: &str,
1285        psk: Option<&Psk>,
1286    ) -> Result<(MultiDeviceTransport, String), ClientError> {
1287        // Parse ciphersuite
1288        let ciphersuite = match ciphersuite_str {
1289            s if s.contains("Kyber768") => Ciphersuite::PQNNpsk2_Kyber768_XChaCha20Poly1305,
1290            _ => Ciphersuite::ClassicalNNpsk2_25519_XChaCha20Poly1035,
1291        };
1292
1293        // Decode handshake data
1294        let init_bytes = STANDARD
1295            .decode(handshake_data)
1296            .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
1297
1298        let init_packet = ap_noise::HandshakePacket::decode(&init_bytes)
1299            .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
1300
1301        // Create responder handshake — with PSK if provided, otherwise null PSK (rendezvous)
1302        let mut handshake = if let Some(psk) = psk {
1303            ResponderHandshake::with_psk(psk.clone())
1304        } else {
1305            ResponderHandshake::new()
1306        };
1307
1308        // Process init and generate response
1309        handshake.receive_start(&init_packet)?;
1310        let response_packet = handshake.send_finish()?;
1311        let (transport, fingerprint) = handshake.finalize()?;
1312
1313        // Send response
1314        let msg = ProtocolMessage::HandshakeResponse {
1315            data: STANDARD.encode(response_packet.encode()?),
1316            ciphersuite: format!("{ciphersuite:?}"),
1317        };
1318
1319        let msg_json = serde_json::to_string(&msg)?;
1320
1321        self.proxy_client
1322            .send_to(remote_fingerprint, msg_json.into_bytes())
1323            .await?;
1324
1325        debug!("Sent handshake response to {:?}", remote_fingerprint);
1326
1327        Ok((transport, fingerprint.to_string()))
1328    }
1329}