Skip to main content

ap_client/clients/
user_client.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(target_arch = "wasm32")]
9use web_time::Instant;
10
11use ap_noise::{Ciphersuite, MultiDeviceTransport, Psk, ResponderHandshake};
12use ap_proxy_client::IncomingMessage;
13use ap_proxy_protocol::{IdentityFingerprint, IdentityKeyPair, RendezvousCode};
14use base64::{Engine, engine::general_purpose::STANDARD};
15use futures_util::StreamExt;
16use futures_util::stream::FuturesUnordered;
17use tokio::sync::oneshot;
18
19use crate::proxy::ProxyClient;
20use crate::types::{CredentialData, PskId, PskToken};
21use tokio::sync::mpsc;
22use tracing::{debug, warn};
23
24/// Base delay for reconnection backoff.
25const RECONNECT_BASE_DELAY: Duration = Duration::from_secs(2);
26/// Maximum delay between reconnection attempts (15 minutes).
27const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(15 * 60);
28/// Maximum age for pending pairings before they are pruned.
29const PENDING_PAIRING_MAX_AGE: Duration = Duration::from_secs(10 * 60);
30/// Maximum number of messages buffered per peer while awaiting fingerprint verification.
31const AWAITING_VERIFICATION_BUFFER_LIMIT: usize = 100;
32
33/// A pending PSK pairing waiting for an incoming handshake.
34struct PskPairing {
35    connection_name: Option<String>,
36    created_at: Instant,
37    psk: Psk,
38}
39
40/// A pending rendezvous pairing waiting for an incoming handshake.
41struct RendezvousPairing {
42    connection_name: Option<String>,
43    created_at: Instant,
44    /// Channel to deliver the rendezvous code — `Some` while awaiting, `None` after delivery.
45    code_tx: Option<oneshot::Sender<Result<RendezvousCode, ClientError>>>,
46}
47
48/// Manages pending pairings and verification message buffers.
49///
50/// Pairings track handshake setup (rendezvous codes, PSKs). Verification buffers
51/// hold messages from peers whose fingerprint is awaiting user approval — once
52/// approved the buffer is drained and replayed, on rejection it is discarded.
53struct PendingPairings {
54    /// PSK pairings keyed by their PskId for direct lookup.
55    psk_pairings: HashMap<PskId, PskPairing>,
56    /// At most one rendezvous pairing at a time.
57    rendezvous: Option<RendezvousPairing>,
58    /// Messages buffered per peer while awaiting fingerprint verification.
59    buffered_messages: HashMap<IdentityFingerprint, Vec<IncomingMessage>>,
60}
61
62impl PendingPairings {
63    fn new() -> Self {
64        Self {
65            psk_pairings: HashMap::new(),
66            rendezvous: None,
67            buffered_messages: HashMap::new(),
68        }
69    }
70
71    /// Remove pairings older than `PENDING_PAIRING_MAX_AGE`.
72    fn prune_stale(&mut self) {
73        self.psk_pairings
74            .retain(|_, p| p.created_at.elapsed() < PENDING_PAIRING_MAX_AGE);
75        if self
76            .rendezvous
77            .as_ref()
78            .is_some_and(|r| r.created_at.elapsed() >= PENDING_PAIRING_MAX_AGE)
79        {
80            self.rendezvous = None;
81        }
82    }
83
84    /// Take the pending rendezvous pairing, if any.
85    fn take_rendezvous(&mut self) -> Option<RendezvousPairing> {
86        self.rendezvous.take()
87    }
88
89    /// Start buffering messages for a source that is awaiting fingerprint verification.
90    fn prepare_buffering(&mut self, source: IdentityFingerprint) {
91        self.buffered_messages.insert(source, Vec::new());
92    }
93
94    /// Try to buffer a message for a source awaiting fingerprint verification.
95    /// Returns `None` if handled (buffered or dropped due to limit),
96    /// or `Some(msg)` if the source is not awaiting verification.
97    fn try_buffer_message(&mut self, msg: IncomingMessage) -> Option<IncomingMessage> {
98        let source = match &msg {
99            IncomingMessage::Send { source, .. } => source,
100            _ => return Some(msg),
101        };
102        if let Some(buffer) = self.buffered_messages.get_mut(source) {
103            if buffer.len() < AWAITING_VERIFICATION_BUFFER_LIMIT {
104                debug!(
105                    "Buffering message from {:?} pending fingerprint verification",
106                    source
107                );
108                buffer.push(msg);
109            } else {
110                warn!("Buffer limit reached for {:?}, dropping message", source);
111            }
112            None
113        } else {
114            Some(msg)
115        }
116    }
117
118    /// Remove and return buffered messages for a source.
119    /// Used to replay on approval or discard on rejection.
120    fn take_buffered_messages(&mut self, source: &IdentityFingerprint) -> Vec<IncomingMessage> {
121        self.buffered_messages.remove(source).unwrap_or_default()
122    }
123}
124
125use super::notify;
126use crate::{
127    error::ClientError,
128    traits::{
129        AuditConnectionType, AuditEvent, AuditLog, ConnectionInfo, ConnectionStore,
130        CredentialFieldSet, IdentityProvider, NoOpAuditLog,
131    },
132    types::{CredentialRequestPayload, CredentialResponsePayload, ProtocolMessage},
133};
134
135// =============================================================================
136// Public types: Notifications (fire-and-forget) and Requests (with reply)
137// =============================================================================
138
139/// Fire-and-forget status updates emitted by the user client.
140#[derive(Debug, Clone)]
141pub enum UserClientNotification {
142    /// Started listening for connections
143    Listening {},
144    /// Noise handshake started
145    HandshakeStart {},
146    /// Noise handshake progress
147    HandshakeProgress {
148        /// Progress message
149        message: String,
150    },
151    /// Noise handshake complete
152    HandshakeComplete {},
153    /// Handshake fingerprint (informational, for PSK connections)
154    HandshakeFingerprint {
155        /// The 6-character hex fingerprint for visual verification
156        fingerprint: String,
157        /// The remote device's stable identity fingerprint
158        identity: IdentityFingerprint,
159    },
160    /// Fingerprint was verified and connection accepted
161    FingerprintVerified {},
162    /// Fingerprint was rejected and connection discarded
163    FingerprintRejected {
164        /// Reason for rejection
165        reason: String,
166    },
167    /// Credential was approved and sent
168    CredentialApproved {
169        /// Domain from the matched credential
170        domain: Option<String>,
171        /// Vault item ID
172        credential_id: Option<String>,
173    },
174    /// Credential was denied
175    CredentialDenied {
176        /// Domain from the matched credential
177        domain: Option<String>,
178        /// Vault item ID
179        credential_id: Option<String>,
180    },
181    /// A known/cached device reconnected — transport keys refreshed, no re-verification needed
182    SessionRefreshed {
183        /// The remote device's identity fingerprint
184        fingerprint: IdentityFingerprint,
185    },
186    /// Client disconnected from proxy
187    ClientDisconnected {},
188    /// Attempting to reconnect to proxy
189    Reconnecting {
190        /// Current reconnection attempt number
191        attempt: u32,
192    },
193    /// Successfully reconnected to proxy
194    Reconnected {},
195    /// An error occurred
196    Error {
197        /// Error message
198        message: String,
199        /// Context where error occurred
200        context: Option<String>,
201    },
202}
203
204/// Reply for fingerprint verification requests.
205#[derive(Debug)]
206pub struct FingerprintVerificationReply {
207    /// Whether user approved the fingerprint
208    pub approved: bool,
209    /// Optional friendly name to assign to the session
210    pub name: Option<String>,
211}
212
213/// Reply for credential requests.
214#[derive(Debug)]
215pub struct CredentialRequestReply {
216    /// Whether approved
217    pub approved: bool,
218    /// The credential to send (if approved)
219    pub credential: Option<CredentialData>,
220    /// Vault item ID (for audit logging)
221    pub credential_id: Option<String>,
222}
223
224/// Requests that require a caller response, carrying a oneshot reply channel.
225#[derive(Debug)]
226pub enum UserClientRequest {
227    /// Handshake fingerprint requires verification (rendezvous connections only).
228    VerifyFingerprint {
229        /// The 6-character hex fingerprint for visual verification
230        fingerprint: String,
231        /// The remote device's stable identity fingerprint
232        identity: IdentityFingerprint,
233        /// Channel to send the verification reply
234        reply: oneshot::Sender<FingerprintVerificationReply>,
235    },
236    /// Credential request received — caller must approve/deny and provide the credential.
237    CredentialRequest {
238        /// The credential query
239        query: crate::types::CredentialQuery,
240        /// The requesting device's identity fingerprint
241        identity: IdentityFingerprint,
242        /// Channel to send the credential reply
243        reply: oneshot::Sender<CredentialRequestReply>,
244    },
245}
246
247// =============================================================================
248// Internal types for pending reply tracking
249// =============================================================================
250
251/// Resolved reply from a pending oneshot, carrying the context needed to process it.
252enum PendingReply {
253    FingerprintVerification {
254        source: IdentityFingerprint,
255        transport: MultiDeviceTransport,
256        connection_name: Option<String>,
257        reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
258    },
259    CredentialResponse {
260        source: IdentityFingerprint,
261        request_id: String,
262        query: crate::types::CredentialQuery,
263        reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
264    },
265}
266
267/// A boxed future that resolves to a `PendingReply`.
268type PendingReplyFuture = Pin<Box<dyn Future<Output = PendingReply> + Send>>;
269
270// =============================================================================
271// Command channel for UserClient handle → event loop communication
272// =============================================================================
273
274/// Commands sent from a `UserClient` handle to the running event loop.
275enum UserClientCommand {
276    /// Generate a PSK token and register a pending pairing.
277    GetPskToken {
278        name: Option<String>,
279        reply: oneshot::Sender<Result<String, ClientError>>,
280    },
281    /// Request a rendezvous code from the proxy and register a pending pairing.
282    GetRendezvousToken {
283        name: Option<String>,
284        reply: oneshot::Sender<Result<RendezvousCode, ClientError>>,
285    },
286}
287
288/// A cloneable handle for controlling the user client.
289///
290/// Obtained from [`UserClient::connect()`], which authenticates with the proxy,
291/// spawns the event loop internally, and returns this handle. All methods
292/// communicate with the event loop through an internal command channel.
293///
294/// `Clone` and `Send` — share freely across tasks and threads.
295/// Handle returned by [`UserClient::connect()`] containing the client and its
296/// notification/request channels.
297pub struct UserClientHandle {
298    pub client: UserClient,
299    pub notifications: mpsc::Receiver<UserClientNotification>,
300    pub requests: mpsc::Receiver<UserClientRequest>,
301}
302
303#[derive(Clone)]
304pub struct UserClient {
305    command_tx: mpsc::Sender<UserClientCommand>,
306}
307
308impl UserClient {
309    /// Connect to the proxy server, spawn the event loop, and return a handle.
310    ///
311    /// This is the single entry point. After `connect()` returns, the client is
312    /// already listening for incoming connections. Use `get_psk_token()` or
313    /// `get_rendezvous_token()` to set up pairings, and receive events through
314    /// the returned handle's notification/request channels.
315    ///
316    /// Pass `None` for `audit_log` to use a no-op logger.
317    pub async fn connect(
318        identity_provider: Box<dyn IdentityProvider>,
319        connection_store: Box<dyn ConnectionStore>,
320        mut proxy_client: Box<dyn ProxyClient>,
321        audit_log: Option<Box<dyn AuditLog>>,
322    ) -> Result<UserClientHandle, ClientError> {
323        // Extract identity once — used for proxy auth, reconnection, and own fingerprint
324        let identity_keypair = identity_provider.identity().await;
325        let own_fingerprint = identity_keypair.identity().fingerprint();
326
327        // Authenticate with the proxy (the async part — before spawn)
328        let incoming_rx = proxy_client.connect(identity_keypair.clone()).await?;
329
330        // Create channels
331        let (notification_tx, notification_rx) = mpsc::channel(32);
332        let (request_tx, request_rx) = mpsc::channel(32);
333
334        // Create command channel
335        let (command_tx, command_rx) = mpsc::channel(32);
336
337        // Build the inner state
338        let inner = UserClientInner {
339            connection_store,
340            proxy_client,
341            identity_keypair,
342            own_fingerprint,
343            transports: HashMap::new(),
344            pending_pairings: PendingPairings::new(),
345            audit_log: audit_log.unwrap_or_else(|| Box::new(NoOpAuditLog)),
346        };
347
348        // Spawn the event loop — use spawn_local on WASM (no Tokio runtime)
349        #[cfg(target_arch = "wasm32")]
350        wasm_bindgen_futures::spawn_local(inner.run_event_loop(
351            incoming_rx,
352            command_rx,
353            notification_tx,
354            request_tx,
355        ));
356        #[cfg(not(target_arch = "wasm32"))]
357        tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
358
359        Ok(UserClientHandle {
360            client: Self { command_tx },
361            notifications: notification_rx,
362            requests: request_rx,
363        })
364    }
365
366    /// Generate a PSK token for a new pairing.
367    ///
368    /// Returns the formatted token string (`<psk_hex>_<fingerprint_hex>`).
369    /// Multiple PSK pairings are supported concurrently (each matched by `psk_id`).
370    pub async fn get_psk_token(&self, name: Option<String>) -> Result<String, ClientError> {
371        let (tx, rx) = oneshot::channel();
372        self.command_tx
373            .send(UserClientCommand::GetPskToken { name, reply: tx })
374            .await
375            .map_err(|_| ClientError::ChannelClosed)?;
376        rx.await.map_err(|_| ClientError::ChannelClosed)?
377    }
378
379    /// Request a rendezvous code from the proxy for a new pairing.
380    ///
381    /// Only one rendezvous pairing at a time — there's no way to distinguish
382    /// incoming rendezvous handshakes.
383    pub async fn get_rendezvous_token(
384        &self,
385        name: Option<String>,
386    ) -> Result<RendezvousCode, ClientError> {
387        let (tx, rx) = oneshot::channel();
388        self.command_tx
389            .send(UserClientCommand::GetRendezvousToken { name, reply: tx })
390            .await
391            .map_err(|_| ClientError::ChannelClosed)?;
392        rx.await.map_err(|_| ClientError::ChannelClosed)?
393    }
394}
395
396// =============================================================================
397// Internal state — lives inside the spawned event loop task
398// =============================================================================
399
400/// All mutable state for the user client, owned by the spawned event loop task.
401struct UserClientInner {
402    connection_store: Box<dyn ConnectionStore>,
403    proxy_client: Box<dyn ProxyClient>,
404    /// Our identity keypair (needed for reconnection).
405    identity_keypair: IdentityKeyPair,
406    /// Our own identity fingerprint (cached at construction time).
407    own_fingerprint: IdentityFingerprint,
408    /// Map of fingerprint -> transport
409    transports: HashMap<IdentityFingerprint, MultiDeviceTransport>,
410    /// Pending pairings and verification message buffers.
411    pending_pairings: PendingPairings,
412    /// Audit logger for security-relevant events
413    audit_log: Box<dyn AuditLog>,
414}
415
416impl UserClientInner {
417    /// Run the main event loop (consumes self).
418    async fn run_event_loop(
419        mut self,
420        mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
421        mut command_rx: mpsc::Receiver<UserClientCommand>,
422        notification_tx: mpsc::Sender<UserClientNotification>,
423        request_tx: mpsc::Sender<UserClientRequest>,
424    ) {
425        // Emit Listening notification
426        notify!(notification_tx, UserClientNotification::Listening {});
427
428        let mut pending_replies: FuturesUnordered<PendingReplyFuture> = FuturesUnordered::new();
429
430        loop {
431            tokio::select! {
432                msg = incoming_rx.recv() => {
433                    match msg {
434                        Some(msg) => {
435                            match self.handle_incoming(msg, &notification_tx, &request_tx).await {
436                                Ok(Some(fut)) => pending_replies.push(fut),
437                                Ok(None) => {}
438                                Err(e) => {
439                                    warn!("Error handling incoming message: {}", e);
440                                    notify!(notification_tx, UserClientNotification::Error {
441                                        message: e.to_string(),
442                                        context: Some("handle_incoming".to_string()),
443                                    });
444                                }
445                            }
446                        }
447                        None => {
448                            // Incoming channel closed — proxy connection lost
449                            notify!(notification_tx, UserClientNotification::ClientDisconnected {});
450                            match self.attempt_reconnection(&notification_tx).await {
451                                Ok(new_rx) => {
452                                    incoming_rx = new_rx;
453                                    notify!(notification_tx, UserClientNotification::Reconnected {});
454                                }
455                                Err(e) => {
456                                    warn!("Reconnection failed permanently: {}", e);
457                                    notify!(notification_tx, UserClientNotification::Error {
458                                        message: e.to_string(),
459                                        context: Some("reconnection".to_string()),
460                                    });
461                                    return;
462                                }
463                            }
464                        }
465                    }
466                }
467                Some(reply) = pending_replies.next() => {
468                    match self.process_pending_reply(reply, &notification_tx, &request_tx).await {
469                        Ok(futs) => {
470                            for fut in futs {
471                                pending_replies.push(fut);
472                            }
473                        }
474                        Err(e) => {
475                            warn!("Error processing pending reply: {}", e);
476                            notify!(notification_tx, UserClientNotification::Error {
477                                message: e.to_string(),
478                                context: Some("process_pending_reply".to_string()),
479                            });
480                        }
481                    }
482                }
483                cmd = command_rx.recv() => {
484                    match cmd {
485                        Some(cmd) => self.handle_command(cmd, &notification_tx).await,
486                        None => {
487                            // All handles dropped — shut down
488                            debug!("All UserClient handles dropped, shutting down event loop");
489                            return;
490                        }
491                    }
492                }
493            }
494        }
495    }
496
497    /// Attempt to reconnect to the proxy server with exponential backoff.
498    async fn attempt_reconnection(
499        &mut self,
500        notification_tx: &mpsc::Sender<UserClientNotification>,
501    ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ClientError> {
502        use rand::{Rng, SeedableRng};
503
504        let mut rng = rand::rngs::StdRng::from_entropy();
505        let mut attempt: u32 = 0;
506
507        loop {
508            attempt = attempt.saturating_add(1);
509
510            // Disconnect (ignore errors — connection may already be dead)
511            let _ = self.proxy_client.disconnect().await;
512
513            match self
514                .proxy_client
515                .connect(self.identity_keypair.clone())
516                .await
517            {
518                Ok(new_rx) => {
519                    debug!("Reconnected to proxy on attempt {}", attempt);
520                    return Ok(new_rx);
521                }
522                Err(e) => {
523                    debug!("Reconnection attempt {} failed: {}", attempt, e);
524                    notify!(
525                        notification_tx,
526                        UserClientNotification::Reconnecting { attempt }
527                    );
528
529                    // Exponential backoff with jitter
530                    let exp_delay = RECONNECT_BASE_DELAY
531                        .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
532                    let delay = exp_delay.min(RECONNECT_MAX_DELAY);
533                    let jitter_max = (delay.as_millis() as u64) / 4;
534                    let jitter = if jitter_max > 0 {
535                        rng.gen_range(0..=jitter_max)
536                    } else {
537                        0
538                    };
539                    let total_delay = delay + Duration::from_millis(jitter);
540
541                    crate::compat::sleep(total_delay).await;
542                }
543            }
544        }
545    }
546
547    /// Handle incoming messages from proxy.
548    async fn handle_incoming(
549        &mut self,
550        msg: IncomingMessage,
551        notification_tx: &mpsc::Sender<UserClientNotification>,
552        request_tx: &mpsc::Sender<UserClientRequest>,
553    ) -> Result<Option<PendingReplyFuture>, ClientError> {
554        // If this source is awaiting fingerprint verification, buffer the message
555        let Some(msg) = self.pending_pairings.try_buffer_message(msg) else {
556            return Ok(None);
557        };
558
559        match msg {
560            IncomingMessage::Send {
561                source, payload, ..
562            } => {
563                let text = String::from_utf8(payload)
564                    .map_err(|e| ClientError::Serialization(format!("Invalid UTF-8: {e}")))?;
565
566                let protocol_msg: ProtocolMessage = serde_json::from_str(&text)?;
567
568                match protocol_msg {
569                    ProtocolMessage::HandshakeInit {
570                        data,
571                        ciphersuite,
572                        psk_id,
573                    } => {
574                        self.handle_handshake_init(
575                            source,
576                            data,
577                            ciphersuite,
578                            psk_id,
579                            notification_tx,
580                            request_tx,
581                        )
582                        .await
583                    }
584                    ProtocolMessage::CredentialRequest { encrypted } => {
585                        self.handle_credential_request(
586                            source,
587                            encrypted,
588                            notification_tx,
589                            request_tx,
590                        )
591                        .await
592                    }
593                    _ => {
594                        debug!("Received unexpected message type from {:?}", source);
595                        Ok(None)
596                    }
597                }
598            }
599            IncomingMessage::RendezvousInfo(code) => {
600                if let Some(pairing) = &mut self.pending_pairings.rendezvous {
601                    if let Some(sender) = pairing.code_tx.take() {
602                        debug!("Completed rendezvous pairing via handle, code: {}", code);
603                        let _ = sender.send(Ok(code));
604                    }
605                } else {
606                    debug!("Received RendezvousInfo but no pending rendezvous pairing found");
607                }
608                Ok(None)
609            }
610            IncomingMessage::IdentityInfo { .. } => {
611                // Only RemoteClient needs this
612                debug!("Received unexpected IdentityInfo message");
613                Ok(None)
614            }
615        }
616    }
617
618    /// Handle handshake init message.
619    async fn handle_handshake_init(
620        &mut self,
621        source: IdentityFingerprint,
622        data: String,
623        ciphersuite: String,
624        psk_id: Option<PskId>,
625        notification_tx: &mpsc::Sender<UserClientNotification>,
626        request_tx: &mpsc::Sender<UserClientRequest>,
627    ) -> Result<Option<PendingReplyFuture>, ClientError> {
628        debug!("Received handshake init from source: {:?}", source);
629        notify!(notification_tx, UserClientNotification::HandshakeStart {});
630
631        // Check if this is an existing/cached connection (bypass pairing lookup)
632        let is_new_connection = self.connection_store.get(&source).await.is_none();
633
634        // Determine which PSK to use and find the matching pairing.
635        let (psk_for_handshake, matched_pairing_name, is_psk_connection) = if !is_new_connection {
636            // Existing/cached session — no pairing lookup needed
637            (None, None, false)
638        } else {
639            // New connection — look up and consume a pending pairing
640            self.pending_pairings.prune_stale();
641
642            match &psk_id {
643                Some(id) => {
644                    // PSK mode — find matching pairing by psk_id
645                    if let Some(pairing) = self.pending_pairings.psk_pairings.remove(id) {
646                        (Some(pairing.psk), pairing.connection_name, true)
647                    } else {
648                        warn!("No matching PSK pairing for psk_id: {}", id);
649                        return Err(ClientError::InvalidState {
650                            expected: "matching PSK pairing".to_string(),
651                            current: format!("no pairing for psk_id {id}"),
652                        });
653                    }
654                }
655                None => {
656                    // Rendezvous mode — take the pending rendezvous pairing
657                    if let Some(pairing) = self.pending_pairings.take_rendezvous() {
658                        (None, pairing.connection_name, false)
659                    } else {
660                        return Err(ClientError::InvalidState {
661                            expected: "pending rendezvous pairing".to_string(),
662                            current: "no pending rendezvous pairing".to_string(),
663                        });
664                    }
665                }
666            }
667        };
668
669        let (transport, fingerprint_str) = self
670            .complete_handshake(source, &data, &ciphersuite, psk_for_handshake.as_ref())
671            .await?;
672
673        notify!(
674            notification_tx,
675            UserClientNotification::HandshakeComplete {}
676        );
677
678        if is_new_connection && !is_psk_connection {
679            // New rendezvous connection: require fingerprint verification.
680            // Buffer messages from this source until verification completes.
681            self.pending_pairings.prepare_buffering(source);
682
683            let (tx, rx) = oneshot::channel();
684
685            if request_tx.capacity() == 0 {
686                warn!("Request channel full, waiting for consumer to drain");
687            }
688            request_tx
689                .send(UserClientRequest::VerifyFingerprint {
690                    fingerprint: fingerprint_str,
691                    identity: source,
692                    reply: tx,
693                })
694                .await
695                .ok();
696
697            let fut: PendingReplyFuture = Box::pin(async move {
698                let result = rx.await;
699                PendingReply::FingerprintVerification {
700                    source,
701                    transport,
702                    connection_name: matched_pairing_name,
703                    reply: result,
704                }
705            });
706
707            Ok(Some(fut))
708        } else if !is_new_connection {
709            // Existing/cached connection: already verified on first connection.
710            // Get existing connection to preserve name and cached_at, update transport.
711            let existing = self.connection_store.get(&source).await;
712            let now = crate::compat::now_seconds();
713            self.transports.insert(source, transport.clone());
714            self.connection_store
715                .save(ConnectionInfo {
716                    fingerprint: source,
717                    name: existing.as_ref().and_then(|s| s.name.clone()),
718                    cached_at: existing.as_ref().map_or(now, |s| s.cached_at),
719                    last_connected_at: now,
720                    transport_state: Some(transport),
721                })
722                .await?;
723
724            self.audit_log
725                .write(AuditEvent::SessionRefreshed {
726                    remote_identity: &source,
727                })
728                .await;
729
730            notify!(
731                notification_tx,
732                UserClientNotification::SessionRefreshed {
733                    fingerprint: source,
734                }
735            );
736
737            Ok(None)
738        } else {
739            // PSK connection: trust established via pre-shared key, no verification needed
740            self.accept_new_connection(
741                source,
742                transport,
743                matched_pairing_name.as_deref(),
744                AuditConnectionType::Psk,
745            )
746            .await?;
747
748            // Emit fingerprint as informational notification (no reply needed)
749            notify!(
750                notification_tx,
751                UserClientNotification::HandshakeFingerprint {
752                    fingerprint: fingerprint_str,
753                    identity: source,
754                }
755            );
756
757            Ok(None)
758        }
759    }
760
761    /// Accept a new connection: cache connection, store transport, set name, and audit
762    async fn accept_new_connection(
763        &mut self,
764        fingerprint: IdentityFingerprint,
765        transport: MultiDeviceTransport,
766        connection_name: Option<&str>,
767        connection_type: AuditConnectionType,
768    ) -> Result<(), ClientError> {
769        let now = crate::compat::now_seconds();
770        self.transports.insert(fingerprint, transport.clone());
771        self.connection_store
772            .save(ConnectionInfo {
773                fingerprint,
774                name: connection_name.map(|s| s.to_owned()),
775                cached_at: now,
776                last_connected_at: now,
777                transport_state: Some(transport),
778            })
779            .await?;
780
781        self.audit_log
782            .write(AuditEvent::ConnectionEstablished {
783                remote_identity: &fingerprint,
784                remote_name: connection_name,
785                connection_type,
786            })
787            .await;
788
789        Ok(())
790    }
791
792    /// Handle credential request.
793    async fn handle_credential_request(
794        &mut self,
795        source: IdentityFingerprint,
796        encrypted: String,
797        notification_tx: &mpsc::Sender<UserClientNotification>,
798        request_tx: &mpsc::Sender<UserClientRequest>,
799    ) -> Result<Option<PendingReplyFuture>, ClientError> {
800        if !self.transports.contains_key(&source) {
801            debug!("Loading transport state for source: {:?}", source);
802            let connection = self.connection_store.get(&source).await.ok_or_else(|| {
803                ClientError::ConnectionCache(format!("Missing cached connection {source:?}"))
804            })?;
805            let transport = connection.transport_state.ok_or_else(|| {
806                ClientError::ConnectionCache(format!(
807                    "Missing transport state for cached connection {source:?}"
808                ))
809            })?;
810            self.transports.insert(source, transport);
811        }
812
813        // Get transport for this source
814        let transport = self
815            .transports
816            .get_mut(&source)
817            .ok_or(ClientError::SecureChannelNotEstablished)?;
818
819        // Decrypt request
820        let encrypted_bytes = STANDARD
821            .decode(&encrypted)
822            .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
823
824        let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
825            .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
826
827        let decrypted = transport
828            .decrypt(&packet)
829            .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
830
831        let request: CredentialRequestPayload = serde_json::from_slice(&decrypted)?;
832
833        self.audit_log
834            .write(AuditEvent::CredentialRequested {
835                query: &request.query,
836                remote_identity: &source,
837                request_id: &request.request_id,
838            })
839            .await;
840
841        // Create oneshot channel for the reply
842        let (tx, rx) = oneshot::channel();
843
844        // Send request to caller
845        if request_tx.capacity() == 0 {
846            warn!("Request channel full, waiting for consumer to drain");
847        }
848        if request_tx
849            .send(UserClientRequest::CredentialRequest {
850                query: request.query.clone(),
851                identity: source,
852                reply: tx,
853            })
854            .await
855            .is_err()
856        {
857            // Request channel closed — caller is gone
858            warn!("Request channel closed, cannot send credential request");
859            notify!(
860                notification_tx,
861                UserClientNotification::Error {
862                    message: "Request channel closed".to_string(),
863                    context: Some("handle_credential_request".to_string()),
864                }
865            );
866            return Ok(None);
867        }
868
869        // Return future that awaits the reply
870        let request_id = request.request_id;
871        let query = request.query;
872        let fut: PendingReplyFuture = Box::pin(async move {
873            let result = rx.await;
874            PendingReply::CredentialResponse {
875                source,
876                request_id,
877                query,
878                reply: result,
879            }
880        });
881
882        Ok(Some(fut))
883    }
884
885    /// Process a resolved pending reply from the `FuturesUnordered`.
886    async fn process_pending_reply(
887        &mut self,
888        reply: PendingReply,
889        notification_tx: &mpsc::Sender<UserClientNotification>,
890        request_tx: &mpsc::Sender<UserClientRequest>,
891    ) -> Result<Vec<PendingReplyFuture>, ClientError> {
892        match reply {
893            PendingReply::FingerprintVerification {
894                source,
895                transport,
896                connection_name,
897                reply,
898            } => {
899                self.process_fingerprint_reply(
900                    source,
901                    transport,
902                    connection_name,
903                    reply,
904                    notification_tx,
905                    request_tx,
906                )
907                .await
908            }
909            PendingReply::CredentialResponse {
910                source,
911                request_id,
912                query,
913                reply,
914            } => {
915                self.process_credential_reply(source, request_id, query, reply, notification_tx)
916                    .await?;
917                Ok(Vec::new())
918            }
919        }
920    }
921
922    /// Handle a command from a `UserClient` handle.
923    async fn handle_command(
924        &mut self,
925        cmd: UserClientCommand,
926        notification_tx: &mpsc::Sender<UserClientNotification>,
927    ) {
928        match cmd {
929            UserClientCommand::GetPskToken { name, reply } => {
930                let result = self.generate_psk_token(name).await;
931                let _ = reply.send(result);
932            }
933            UserClientCommand::GetRendezvousToken { name, reply } => {
934                if let Err(e) = self.proxy_client.request_rendezvous().await {
935                    let _ = reply.send(Err(e));
936                    return;
937                }
938
939                // Prune stale pairings
940                self.pending_pairings.prune_stale();
941
942                // Replace any existing rendezvous pairing — the old sender drops,
943                // causing the receiver to get a RecvError (maps to ChannelClosed)
944                self.pending_pairings.rendezvous = Some(RendezvousPairing {
945                    connection_name: name,
946                    created_at: Instant::now(),
947                    code_tx: Some(reply),
948                });
949
950                // Emit notification so the caller knows a code is being requested
951                notify!(
952                    notification_tx,
953                    UserClientNotification::HandshakeProgress {
954                        message: "Requesting rendezvous code from proxy...".to_string(),
955                    }
956                );
957            }
958        }
959    }
960
961    /// Generate a PSK token internally.
962    async fn generate_psk_token(&mut self, name: Option<String>) -> Result<String, ClientError> {
963        let psk = Psk::generate();
964        let psk_id = psk.id();
965        let token = PskToken::new(psk.clone(), self.own_fingerprint).to_string();
966
967        self.pending_pairings.prune_stale();
968        self.pending_pairings.psk_pairings.insert(
969            psk_id,
970            PskPairing {
971                connection_name: name,
972                created_at: Instant::now(),
973                psk,
974            },
975        );
976        debug!("Created PSK pairing, token generated");
977
978        Ok(token)
979    }
980
981    /// Process a fingerprint verification reply.
982    async fn process_fingerprint_reply(
983        &mut self,
984        source: IdentityFingerprint,
985        transport: MultiDeviceTransport,
986        connection_name: Option<String>,
987        reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
988        notification_tx: &mpsc::Sender<UserClientNotification>,
989        request_tx: &mpsc::Sender<UserClientRequest>,
990    ) -> Result<Vec<PendingReplyFuture>, ClientError> {
991        match reply {
992            Ok(FingerprintVerificationReply {
993                approved: true,
994                name,
995            }) => {
996                // Use the name from the reply, falling back to the pairing name
997                let conn_name = name.or(connection_name);
998                self.accept_new_connection(
999                    source,
1000                    transport,
1001                    conn_name.as_deref(),
1002                    AuditConnectionType::Rendezvous,
1003                )
1004                .await?;
1005
1006                notify!(
1007                    notification_tx,
1008                    UserClientNotification::FingerprintVerified {}
1009                );
1010
1011                // Drain and replay buffered messages
1012                let mut futures = Vec::new();
1013                for msg in self.pending_pairings.take_buffered_messages(&source) {
1014                    match self.handle_incoming(msg, notification_tx, request_tx).await {
1015                        Ok(Some(fut)) => futures.push(fut),
1016                        Ok(None) => {}
1017                        Err(e) => {
1018                            warn!("Error processing buffered message: {}", e);
1019                        }
1020                    }
1021                }
1022
1023                Ok(futures)
1024            }
1025            Ok(FingerprintVerificationReply {
1026                approved: false, ..
1027            }) => {
1028                self.reject_fingerprint(
1029                    &source,
1030                    "User rejected fingerprint verification",
1031                    notification_tx,
1032                )
1033                .await;
1034                Ok(Vec::new())
1035            }
1036            Err(_) => {
1037                warn!("Fingerprint verification reply channel dropped, treating as rejection");
1038                self.reject_fingerprint(
1039                    &source,
1040                    "Verification cancelled (reply dropped)",
1041                    notification_tx,
1042                )
1043                .await;
1044                Ok(Vec::new())
1045            }
1046        }
1047    }
1048
1049    /// Reject a fingerprint verification: discard buffered messages, audit, and notify.
1050    async fn reject_fingerprint(
1051        &mut self,
1052        source: &IdentityFingerprint,
1053        reason: &str,
1054        notification_tx: &mpsc::Sender<UserClientNotification>,
1055    ) {
1056        self.pending_pairings.take_buffered_messages(source);
1057
1058        self.audit_log
1059            .write(AuditEvent::ConnectionRejected {
1060                remote_identity: source,
1061            })
1062            .await;
1063
1064        notify!(
1065            notification_tx,
1066            UserClientNotification::FingerprintRejected {
1067                reason: reason.to_string(),
1068            }
1069        );
1070    }
1071
1072    /// Process a credential request reply.
1073    #[allow(clippy::too_many_arguments)]
1074    async fn process_credential_reply(
1075        &mut self,
1076        source: IdentityFingerprint,
1077        request_id: String,
1078        query: crate::types::CredentialQuery,
1079        reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
1080        notification_tx: &mpsc::Sender<UserClientNotification>,
1081    ) -> Result<(), ClientError> {
1082        let reply = match reply {
1083            Ok(r) => r,
1084            Err(_) => {
1085                // Oneshot sender was dropped — treat as denial
1086                warn!("Credential reply channel dropped, treating as denial");
1087                CredentialRequestReply {
1088                    approved: false,
1089                    credential: None,
1090                    credential_id: None,
1091                }
1092            }
1093        };
1094
1095        let transport = self
1096            .transports
1097            .get_mut(&source)
1098            .ok_or(ClientError::SecureChannelNotEstablished)?;
1099
1100        // Extract domain and audit fields before credential is moved into the response payload
1101        let domain = reply.credential.as_ref().and_then(|c| c.domain.clone());
1102        let fields = reply
1103            .credential
1104            .as_ref()
1105            .map_or_else(CredentialFieldSet::default, |c| CredentialFieldSet {
1106                has_username: c.username.is_some(),
1107                has_password: c.password.is_some(),
1108                has_totp: c.totp.is_some(),
1109                has_uri: c.uri.is_some(),
1110                has_notes: c.notes.is_some(),
1111            });
1112
1113        // Create response payload
1114        let response_payload = CredentialResponsePayload {
1115            credential: if reply.approved {
1116                reply.credential
1117            } else {
1118                None
1119            },
1120            error: if !reply.approved {
1121                Some("Request denied".to_string())
1122            } else {
1123                None
1124            },
1125            request_id: Some(request_id.clone()),
1126        };
1127
1128        // Encrypt and send
1129        let response_json = serde_json::to_string(&response_payload)?;
1130        let encrypted = transport
1131            .encrypt(response_json.as_bytes())
1132            .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
1133
1134        let msg = ProtocolMessage::CredentialResponse {
1135            encrypted: STANDARD.encode(encrypted.encode()),
1136        };
1137
1138        let msg_json = serde_json::to_string(&msg)?;
1139
1140        self.proxy_client
1141            .send_to(source, msg_json.into_bytes())
1142            .await?;
1143
1144        // Send notification and audit
1145        if reply.approved {
1146            self.audit_log
1147                .write(AuditEvent::CredentialApproved {
1148                    query: &query,
1149                    domain: domain.as_deref(),
1150                    remote_identity: &source,
1151                    request_id: &request_id,
1152                    credential_id: reply.credential_id.as_deref(),
1153                    fields,
1154                })
1155                .await;
1156
1157            notify!(
1158                notification_tx,
1159                UserClientNotification::CredentialApproved {
1160                    domain,
1161                    credential_id: reply.credential_id,
1162                }
1163            );
1164        } else {
1165            self.audit_log
1166                .write(AuditEvent::CredentialDenied {
1167                    query: &query,
1168                    domain: domain.as_deref(),
1169                    remote_identity: &source,
1170                    request_id: &request_id,
1171                    credential_id: reply.credential_id.as_deref(),
1172                })
1173                .await;
1174
1175            notify!(
1176                notification_tx,
1177                UserClientNotification::CredentialDenied {
1178                    domain,
1179                    credential_id: reply.credential_id,
1180                }
1181            );
1182        }
1183
1184        Ok(())
1185    }
1186
1187    /// Complete Noise handshake as responder
1188    async fn complete_handshake(
1189        &self,
1190        remote_fingerprint: IdentityFingerprint,
1191        handshake_data: &str,
1192        ciphersuite_str: &str,
1193        psk: Option<&Psk>,
1194    ) -> Result<(MultiDeviceTransport, String), ClientError> {
1195        // Parse ciphersuite
1196        let ciphersuite = match ciphersuite_str {
1197            s if s.contains("Kyber768") => Ciphersuite::PQNNpsk2_Kyber768_XChaCha20Poly1305,
1198            _ => Ciphersuite::ClassicalNNpsk2_25519_XChaCha20Poly1035,
1199        };
1200
1201        // Decode handshake data
1202        let init_bytes = STANDARD
1203            .decode(handshake_data)
1204            .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
1205
1206        let init_packet = ap_noise::HandshakePacket::decode(&init_bytes)
1207            .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
1208
1209        // Create responder handshake — with PSK if provided, otherwise null PSK (rendezvous)
1210        let mut handshake = if let Some(psk) = psk {
1211            ResponderHandshake::with_psk(psk.clone())
1212        } else {
1213            ResponderHandshake::new()
1214        };
1215
1216        // Process init and generate response
1217        handshake.receive_start(&init_packet)?;
1218        let response_packet = handshake.send_finish()?;
1219        let (transport, fingerprint) = handshake.finalize()?;
1220
1221        // Send response
1222        let msg = ProtocolMessage::HandshakeResponse {
1223            data: STANDARD.encode(response_packet.encode()?),
1224            ciphersuite: format!("{ciphersuite:?}"),
1225        };
1226
1227        let msg_json = serde_json::to_string(&msg)?;
1228
1229        self.proxy_client
1230            .send_to(remote_fingerprint, msg_json.into_bytes())
1231            .await?;
1232
1233        debug!("Sent handshake response to {:?}", remote_fingerprint);
1234
1235        Ok((transport, fingerprint.to_string()))
1236    }
1237}