1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(target_arch = "wasm32")]
9use web_time::Instant;
10
11use ap_noise::{Ciphersuite, MultiDeviceTransport, Psk, ResponderHandshake};
12use ap_proxy_client::IncomingMessage;
13use ap_proxy_protocol::{IdentityFingerprint, RendezvousCode};
14use base64::{Engine, engine::general_purpose::STANDARD};
15use futures_util::StreamExt;
16use futures_util::stream::FuturesUnordered;
17use tokio::sync::oneshot;
18
19use crate::proxy::ProxyClient;
20use crate::types::{CredentialData, PskId};
21use tokio::sync::mpsc;
22use tracing::{debug, warn};
23
24const RECONNECT_BASE_DELAY: Duration = Duration::from_secs(2);
26const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(15 * 60);
28const PENDING_PAIRING_MAX_AGE: Duration = Duration::from_secs(10 * 60);
30
31pub(crate) enum PairingKind {
33 Rendezvous {
38 reply: Option<oneshot::Sender<Result<RendezvousCode, RemoteClientError>>>,
39 },
40 Psk { psk: Psk, psk_id: PskId },
42}
43
44pub(crate) struct PendingPairing {
46 connection_name: Option<String>,
48 created_at: Instant,
50 kind: PairingKind,
52}
53
54use crate::{
55 error::RemoteClientError,
56 traits::{
57 AuditConnectionType, AuditEvent, AuditLog, CredentialFieldSet, IdentityProvider,
58 NoOpAuditLog, SessionStore,
59 },
60 types::{CredentialRequestPayload, CredentialResponsePayload, ProtocolMessage},
61};
62
63#[derive(Debug, Clone)]
69pub enum UserClientNotification {
70 Listening {},
72 HandshakeStart {},
74 HandshakeProgress {
76 message: String,
78 },
79 HandshakeComplete {},
81 HandshakeFingerprint {
83 fingerprint: String,
85 identity: IdentityFingerprint,
87 },
88 FingerprintVerified {},
90 FingerprintRejected {
92 reason: String,
94 },
95 CredentialApproved {
97 domain: Option<String>,
99 credential_id: Option<String>,
101 },
102 CredentialDenied {
104 domain: Option<String>,
106 credential_id: Option<String>,
108 },
109 SessionRefreshed {
111 fingerprint: IdentityFingerprint,
113 },
114 ClientDisconnected {},
116 Reconnecting {
118 attempt: u32,
120 },
121 Reconnected {},
123 Error {
125 message: String,
127 context: Option<String>,
129 },
130}
131
132#[derive(Debug)]
134pub struct FingerprintVerificationReply {
135 pub approved: bool,
137 pub name: Option<String>,
139}
140
141#[derive(Debug)]
143pub struct CredentialRequestReply {
144 pub approved: bool,
146 pub credential: Option<CredentialData>,
148 pub credential_id: Option<String>,
150}
151
152#[derive(Debug)]
154pub enum UserClientRequest {
155 VerifyFingerprint {
157 fingerprint: String,
159 identity: IdentityFingerprint,
161 reply: oneshot::Sender<FingerprintVerificationReply>,
163 },
164 CredentialRequest {
166 query: crate::types::CredentialQuery,
168 identity: IdentityFingerprint,
170 reply: oneshot::Sender<CredentialRequestReply>,
172 },
173}
174
175enum PendingReply {
181 FingerprintVerification {
182 source: IdentityFingerprint,
183 transport: MultiDeviceTransport,
184 connection_name: Option<String>,
185 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
186 },
187 CredentialResponse {
188 source: IdentityFingerprint,
189 request_id: String,
190 query: crate::types::CredentialQuery,
191 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
192 },
193}
194
195type PendingReplyFuture = Pin<Box<dyn Future<Output = PendingReply> + Send>>;
197
198enum UserClientCommand {
204 GetPskToken {
206 name: Option<String>,
207 reply: oneshot::Sender<Result<String, RemoteClientError>>,
208 },
209 GetRendezvousToken {
211 name: Option<String>,
212 reply: oneshot::Sender<Result<RendezvousCode, RemoteClientError>>,
213 },
214}
215
216#[derive(Clone)]
224pub struct UserClient {
225 command_tx: mpsc::Sender<UserClientCommand>,
226}
227
228impl UserClient {
229 pub async fn connect(
238 identity_provider: Box<dyn IdentityProvider>,
239 session_store: Box<dyn SessionStore>,
240 mut proxy_client: Box<dyn ProxyClient>,
241 notification_tx: mpsc::Sender<UserClientNotification>,
242 request_tx: mpsc::Sender<UserClientRequest>,
243 audit_log: Option<Box<dyn AuditLog>>,
244 ) -> Result<Self, RemoteClientError> {
245 let incoming_rx = proxy_client.connect().await?;
247
248 let (command_tx, command_rx) = mpsc::channel(32);
250
251 let own_fingerprint = identity_provider.fingerprint().await;
253
254 let inner = UserClientInner {
256 session_store,
257 proxy_client,
258 own_fingerprint,
259 transports: HashMap::new(),
260 pending_pairings: Vec::new(),
261 audit_log: audit_log.unwrap_or_else(|| Box::new(NoOpAuditLog)),
262 };
263
264 #[cfg(target_arch = "wasm32")]
266 wasm_bindgen_futures::spawn_local(inner.run_event_loop(
267 incoming_rx,
268 command_rx,
269 notification_tx,
270 request_tx,
271 ));
272 #[cfg(not(target_arch = "wasm32"))]
273 tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
274
275 Ok(Self { command_tx })
276 }
277
278 pub async fn get_psk_token(&self, name: Option<String>) -> Result<String, RemoteClientError> {
283 let (tx, rx) = oneshot::channel();
284 self.command_tx
285 .send(UserClientCommand::GetPskToken { name, reply: tx })
286 .await
287 .map_err(|_| RemoteClientError::ChannelClosed)?;
288 rx.await.map_err(|_| RemoteClientError::ChannelClosed)?
289 }
290
291 pub async fn get_rendezvous_token(
296 &self,
297 name: Option<String>,
298 ) -> Result<RendezvousCode, RemoteClientError> {
299 let (tx, rx) = oneshot::channel();
300 self.command_tx
301 .send(UserClientCommand::GetRendezvousToken { name, reply: tx })
302 .await
303 .map_err(|_| RemoteClientError::ChannelClosed)?;
304 rx.await.map_err(|_| RemoteClientError::ChannelClosed)?
305 }
306}
307
308struct UserClientInner {
314 session_store: Box<dyn SessionStore>,
315 proxy_client: Box<dyn ProxyClient>,
316 own_fingerprint: IdentityFingerprint,
318 transports: HashMap<IdentityFingerprint, MultiDeviceTransport>,
320 pending_pairings: Vec<PendingPairing>,
322 audit_log: Box<dyn AuditLog>,
324}
325
326impl UserClientInner {
327 async fn run_event_loop(
329 mut self,
330 mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
331 mut command_rx: mpsc::Receiver<UserClientCommand>,
332 notification_tx: mpsc::Sender<UserClientNotification>,
333 request_tx: mpsc::Sender<UserClientRequest>,
334 ) {
335 notification_tx
337 .send(UserClientNotification::Listening {})
338 .await
339 .ok();
340
341 let mut pending_replies: FuturesUnordered<PendingReplyFuture> = FuturesUnordered::new();
342
343 loop {
344 tokio::select! {
345 msg = incoming_rx.recv() => {
346 match msg {
347 Some(msg) => {
348 match self.handle_incoming(msg, ¬ification_tx, &request_tx).await {
349 Ok(Some(fut)) => pending_replies.push(fut),
350 Ok(None) => {}
351 Err(e) => {
352 warn!("Error handling incoming message: {}", e);
353 notification_tx.send(UserClientNotification::Error {
354 message: e.to_string(),
355 context: Some("handle_incoming".to_string()),
356 }).await.ok();
357 }
358 }
359 }
360 None => {
361 notification_tx.send(UserClientNotification::ClientDisconnected {}).await.ok();
363 match self.attempt_reconnection(¬ification_tx).await {
364 Ok(new_rx) => {
365 incoming_rx = new_rx;
366 notification_tx.send(UserClientNotification::Reconnected {}).await.ok();
367 }
368 Err(e) => {
369 warn!("Reconnection failed permanently: {}", e);
370 notification_tx.send(UserClientNotification::Error {
371 message: e.to_string(),
372 context: Some("reconnection".to_string()),
373 }).await.ok();
374 return;
375 }
376 }
377 }
378 }
379 }
380 Some(reply) = pending_replies.next() => {
381 if let Err(e) = self.process_pending_reply(reply, ¬ification_tx).await {
382 warn!("Error processing pending reply: {}", e);
383 notification_tx.send(UserClientNotification::Error {
384 message: e.to_string(),
385 context: Some("process_pending_reply".to_string()),
386 }).await.ok();
387 }
388 }
389 cmd = command_rx.recv() => {
390 match cmd {
391 Some(cmd) => self.handle_command(cmd, ¬ification_tx).await,
392 None => {
393 debug!("All UserClient handles dropped, shutting down event loop");
395 return;
396 }
397 }
398 }
399 }
400 }
401 }
402
403 async fn attempt_reconnection(
405 &mut self,
406 notification_tx: &mpsc::Sender<UserClientNotification>,
407 ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, RemoteClientError> {
408 use rand::{Rng, SeedableRng};
409
410 let mut rng = rand::rngs::StdRng::from_entropy();
411 let mut attempt: u32 = 0;
412
413 loop {
414 attempt = attempt.saturating_add(1);
415
416 let _ = self.proxy_client.disconnect().await;
418
419 match self.proxy_client.connect().await {
420 Ok(new_rx) => {
421 debug!("Reconnected to proxy on attempt {}", attempt);
422 return Ok(new_rx);
423 }
424 Err(e) => {
425 debug!("Reconnection attempt {} failed: {}", attempt, e);
426 notification_tx
427 .send(UserClientNotification::Reconnecting { attempt })
428 .await
429 .ok();
430
431 let exp_delay = RECONNECT_BASE_DELAY
433 .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
434 let delay = exp_delay.min(RECONNECT_MAX_DELAY);
435 let jitter_max = (delay.as_millis() as u64) / 4;
436 let jitter = if jitter_max > 0 {
437 rng.gen_range(0..=jitter_max)
438 } else {
439 0
440 };
441 let total_delay = delay + Duration::from_millis(jitter);
442
443 crate::compat::sleep(total_delay).await;
444 }
445 }
446 }
447 }
448
449 async fn handle_incoming(
451 &mut self,
452 msg: IncomingMessage,
453 notification_tx: &mpsc::Sender<UserClientNotification>,
454 request_tx: &mpsc::Sender<UserClientRequest>,
455 ) -> Result<Option<PendingReplyFuture>, RemoteClientError> {
456 match msg {
457 IncomingMessage::Send {
458 source, payload, ..
459 } => {
460 let text = String::from_utf8(payload)
462 .map_err(|e| RemoteClientError::Serialization(format!("Invalid UTF-8: {e}")))?;
463
464 let protocol_msg: ProtocolMessage = serde_json::from_str(&text)?;
465
466 match protocol_msg {
467 ProtocolMessage::HandshakeInit {
468 data,
469 ciphersuite,
470 psk_id,
471 } => {
472 self.handle_handshake_init(
473 source,
474 data,
475 ciphersuite,
476 psk_id,
477 notification_tx,
478 request_tx,
479 )
480 .await
481 }
482 ProtocolMessage::CredentialRequest { encrypted } => {
483 self.handle_credential_request(
484 source,
485 encrypted,
486 notification_tx,
487 request_tx,
488 )
489 .await
490 }
491 _ => {
492 debug!("Received unexpected message type from {:?}", source);
493 Ok(None)
494 }
495 }
496 }
497 IncomingMessage::RendezvousInfo(code) => {
498 let idx = self
500 .pending_pairings
501 .iter()
502 .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }));
503
504 if let Some(idx) = idx {
505 let pairing = &mut self.pending_pairings[idx];
507 if let PairingKind::Rendezvous { reply } = &mut pairing.kind {
508 if let Some(sender) = reply.take() {
509 debug!("Completed rendezvous pairing via handle, code: {}", code);
510 let _ = sender.send(Ok(code));
511 }
512 }
513 } else {
514 debug!("Received RendezvousInfo but no pending rendezvous pairing found");
515 }
516 Ok(None)
517 }
518 IncomingMessage::IdentityInfo { .. } => {
519 debug!("Received unexpected IdentityInfo message");
521 Ok(None)
522 }
523 }
524 }
525
526 async fn handle_handshake_init(
528 &mut self,
529 source: IdentityFingerprint,
530 data: String,
531 ciphersuite: String,
532 psk_id: Option<PskId>,
533 notification_tx: &mpsc::Sender<UserClientNotification>,
534 request_tx: &mpsc::Sender<UserClientRequest>,
535 ) -> Result<Option<PendingReplyFuture>, RemoteClientError> {
536 debug!("Received handshake init from source: {:?}", source);
537 notification_tx
538 .send(UserClientNotification::HandshakeStart {})
539 .await
540 .ok();
541
542 let is_new_connection = !self.session_store.has_session(&source).await;
544
545 let (psk_for_handshake, matched_pairing_name, is_psk_connection) = if !is_new_connection {
547 (None, None, false)
549 } else {
550 Self::prune_stale_pairings(&mut self.pending_pairings);
552
553 match &psk_id {
554 Some(id) => {
555 let idx = self.pending_pairings.iter().position(
557 |p| matches!(&p.kind, PairingKind::Psk { psk_id: pid, .. } if pid == id),
558 );
559 if let Some(idx) = idx {
560 let pairing = self.pending_pairings.remove(idx);
561 let psk = match pairing.kind {
562 PairingKind::Psk { psk, .. } => psk,
563 PairingKind::Rendezvous { .. } => unreachable!(),
564 };
565 (Some(psk), pairing.connection_name, true)
566 } else {
567 warn!("No matching PSK pairing for psk_id: {}", id);
568 return Err(RemoteClientError::InvalidState {
569 expected: "matching PSK pairing".to_string(),
570 current: format!("no pairing for psk_id {id}"),
571 });
572 }
573 }
574 None => {
575 let idx = self
578 .pending_pairings
579 .iter()
580 .position(|p| matches!(p.kind, PairingKind::Rendezvous { reply: None }));
581 let connection_name =
582 idx.and_then(|i| self.pending_pairings.remove(i).connection_name);
583 (None, connection_name, false)
584 }
585 }
586 };
587
588 let (transport, fingerprint_str) = self
589 .complete_handshake(source, &data, &ciphersuite, psk_for_handshake.as_ref())
590 .await?;
591
592 notification_tx
593 .send(UserClientNotification::HandshakeComplete {})
594 .await
595 .ok();
596
597 if is_new_connection && !is_psk_connection {
598 let (tx, rx) = oneshot::channel();
600
601 request_tx
602 .send(UserClientRequest::VerifyFingerprint {
603 fingerprint: fingerprint_str,
604 identity: source,
605 reply: tx,
606 })
607 .await
608 .ok();
609
610 let fut: PendingReplyFuture = Box::pin(async move {
611 let result = rx.await;
612 PendingReply::FingerprintVerification {
613 source,
614 transport,
615 connection_name: matched_pairing_name,
616 reply: result,
617 }
618 });
619
620 Ok(Some(fut))
621 } else if !is_new_connection {
622 self.transports.insert(source, transport.clone());
624 self.session_store.cache_session(source).await?;
625 self.session_store
626 .save_transport_state(&source, transport)
627 .await?;
628
629 self.audit_log
630 .write(AuditEvent::SessionRefreshed {
631 remote_identity: &source,
632 })
633 .await;
634
635 notification_tx
636 .send(UserClientNotification::SessionRefreshed {
637 fingerprint: source,
638 })
639 .await
640 .ok();
641
642 Ok(None)
643 } else {
644 self.accept_new_connection(
646 source,
647 transport,
648 matched_pairing_name.as_deref(),
649 AuditConnectionType::Psk,
650 )
651 .await?;
652
653 notification_tx
655 .send(UserClientNotification::HandshakeFingerprint {
656 fingerprint: fingerprint_str,
657 identity: source,
658 })
659 .await
660 .ok();
661
662 Ok(None)
663 }
664 }
665
666 fn prune_stale_pairings(pairings: &mut Vec<PendingPairing>) {
668 pairings.retain(|p| p.created_at.elapsed() < PENDING_PAIRING_MAX_AGE);
669 }
670
671 async fn accept_new_connection(
673 &mut self,
674 fingerprint: IdentityFingerprint,
675 transport: MultiDeviceTransport,
676 session_name: Option<&str>,
677 connection_type: AuditConnectionType,
678 ) -> Result<(), RemoteClientError> {
679 self.transports.insert(fingerprint, transport.clone());
680 self.session_store.cache_session(fingerprint).await?;
681 if let Some(name) = session_name {
682 self.session_store
683 .set_session_name(&fingerprint, name.to_owned())
684 .await?;
685 }
686 self.session_store
687 .save_transport_state(&fingerprint, transport)
688 .await?;
689
690 self.audit_log
691 .write(AuditEvent::ConnectionEstablished {
692 remote_identity: &fingerprint,
693 remote_name: session_name,
694 connection_type,
695 })
696 .await;
697
698 Ok(())
699 }
700
701 async fn handle_credential_request(
703 &mut self,
704 source: IdentityFingerprint,
705 encrypted: String,
706 notification_tx: &mpsc::Sender<UserClientNotification>,
707 request_tx: &mpsc::Sender<UserClientRequest>,
708 ) -> Result<Option<PendingReplyFuture>, RemoteClientError> {
709 if !self.transports.contains_key(&source) {
710 debug!("Loading transport state for source: {:?}", source);
711 let session = self
712 .session_store
713 .load_transport_state(&source)
714 .await?
715 .ok_or_else(|| {
716 RemoteClientError::SessionCache(format!(
717 "Missing transport state for cached session {source:?}"
718 ))
719 })?;
720 self.transports.insert(source, session);
721 }
722
723 let transport = self
725 .transports
726 .get_mut(&source)
727 .ok_or(RemoteClientError::SecureChannelNotEstablished)?;
728
729 let encrypted_bytes = STANDARD
731 .decode(&encrypted)
732 .map_err(|e| RemoteClientError::Serialization(format!("Invalid base64: {e}")))?;
733
734 let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
735 .map_err(|e| RemoteClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
736
737 let decrypted = transport
738 .decrypt(&packet)
739 .map_err(|e| RemoteClientError::NoiseProtocol(e.to_string()))?;
740
741 let request: CredentialRequestPayload = serde_json::from_slice(&decrypted)?;
742
743 self.audit_log
744 .write(AuditEvent::CredentialRequested {
745 query: &request.query,
746 remote_identity: &source,
747 request_id: &request.request_id,
748 })
749 .await;
750
751 let (tx, rx) = oneshot::channel();
753
754 if request_tx
756 .send(UserClientRequest::CredentialRequest {
757 query: request.query.clone(),
758 identity: source,
759 reply: tx,
760 })
761 .await
762 .is_err()
763 {
764 warn!("Request channel closed, cannot send credential request");
766 notification_tx
767 .send(UserClientNotification::Error {
768 message: "Request channel closed".to_string(),
769 context: Some("handle_credential_request".to_string()),
770 })
771 .await
772 .ok();
773 return Ok(None);
774 }
775
776 let request_id = request.request_id;
778 let query = request.query;
779 let fut: PendingReplyFuture = Box::pin(async move {
780 let result = rx.await;
781 PendingReply::CredentialResponse {
782 source,
783 request_id,
784 query,
785 reply: result,
786 }
787 });
788
789 Ok(Some(fut))
790 }
791
792 async fn process_pending_reply(
794 &mut self,
795 reply: PendingReply,
796 notification_tx: &mpsc::Sender<UserClientNotification>,
797 ) -> Result<(), RemoteClientError> {
798 match reply {
799 PendingReply::FingerprintVerification {
800 source,
801 transport,
802 connection_name,
803 reply,
804 } => {
805 self.process_fingerprint_reply(
806 source,
807 transport,
808 connection_name,
809 reply,
810 notification_tx,
811 )
812 .await
813 }
814 PendingReply::CredentialResponse {
815 source,
816 request_id,
817 query,
818 reply,
819 } => {
820 self.process_credential_reply(source, request_id, query, reply, notification_tx)
821 .await
822 }
823 }
824 }
825
826 async fn handle_command(
828 &mut self,
829 cmd: UserClientCommand,
830 notification_tx: &mpsc::Sender<UserClientNotification>,
831 ) {
832 match cmd {
833 UserClientCommand::GetPskToken { name, reply } => {
834 let result = self.generate_psk_token(name).await;
835 let _ = reply.send(result);
836 }
837 UserClientCommand::GetRendezvousToken { name, reply } => {
838 if let Err(e) = self.proxy_client.request_rendezvous().await {
839 let _ = reply.send(Err(e));
840 return;
841 }
842
843 Self::prune_stale_pairings(&mut self.pending_pairings);
845
846 if let Some(old_idx) = self
849 .pending_pairings
850 .iter()
851 .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }))
852 {
853 let old = self.pending_pairings.remove(old_idx);
854 if let PairingKind::Rendezvous {
855 reply: Some(old_reply),
856 } = old.kind
857 {
858 warn!("Replacing existing pending rendezvous pairing");
859 let _ = old_reply.send(Err(RemoteClientError::InvalidState {
860 expected: "single pending rendezvous".to_string(),
861 current: "replaced by new rendezvous request".to_string(),
862 }));
863 }
864 }
865
866 self.pending_pairings.push(PendingPairing {
869 connection_name: name,
870 created_at: Instant::now(),
871 kind: PairingKind::Rendezvous { reply: Some(reply) },
872 });
873
874 notification_tx
876 .send(UserClientNotification::HandshakeProgress {
877 message: "Requesting rendezvous code from proxy...".to_string(),
878 })
879 .await
880 .ok();
881 }
882 }
883 }
884
885 async fn generate_psk_token(
887 &mut self,
888 name: Option<String>,
889 ) -> Result<String, RemoteClientError> {
890 let psk = Psk::generate();
891 let psk_id = psk.id();
892 let token = format!("{}_{}", psk.to_hex(), hex::encode(self.own_fingerprint.0));
893
894 let pairing = PendingPairing {
895 connection_name: name,
896 created_at: Instant::now(),
897 kind: PairingKind::Psk { psk, psk_id },
898 };
899
900 Self::prune_stale_pairings(&mut self.pending_pairings);
901 self.pending_pairings.push(pairing);
902 debug!("Created PSK pairing, token generated");
903
904 Ok(token)
905 }
906
907 async fn process_fingerprint_reply(
909 &mut self,
910 source: IdentityFingerprint,
911 transport: MultiDeviceTransport,
912 connection_name: Option<String>,
913 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
914 notification_tx: &mpsc::Sender<UserClientNotification>,
915 ) -> Result<(), RemoteClientError> {
916 match reply {
917 Ok(FingerprintVerificationReply {
918 approved: true,
919 name,
920 }) => {
921 let session_name = name.or(connection_name);
923 self.accept_new_connection(
924 source,
925 transport,
926 session_name.as_deref(),
927 AuditConnectionType::Rendezvous,
928 )
929 .await?;
930
931 notification_tx
932 .send(UserClientNotification::FingerprintVerified {})
933 .await
934 .ok();
935 }
936 Ok(FingerprintVerificationReply {
937 approved: false, ..
938 }) => {
939 self.audit_log
940 .write(AuditEvent::ConnectionRejected {
941 remote_identity: &source,
942 })
943 .await;
944
945 notification_tx
946 .send(UserClientNotification::FingerprintRejected {
947 reason: "User rejected fingerprint verification".to_string(),
948 })
949 .await
950 .ok();
951 }
952 Err(_) => {
953 warn!("Fingerprint verification reply channel dropped, treating as rejection");
955 self.audit_log
956 .write(AuditEvent::ConnectionRejected {
957 remote_identity: &source,
958 })
959 .await;
960
961 notification_tx
962 .send(UserClientNotification::FingerprintRejected {
963 reason: "Verification cancelled (reply dropped)".to_string(),
964 })
965 .await
966 .ok();
967 }
968 }
969
970 Ok(())
971 }
972
973 #[allow(clippy::too_many_arguments)]
975 async fn process_credential_reply(
976 &mut self,
977 source: IdentityFingerprint,
978 request_id: String,
979 query: crate::types::CredentialQuery,
980 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
981 notification_tx: &mpsc::Sender<UserClientNotification>,
982 ) -> Result<(), RemoteClientError> {
983 let reply = match reply {
984 Ok(r) => r,
985 Err(_) => {
986 warn!("Credential reply channel dropped, treating as denial");
988 CredentialRequestReply {
989 approved: false,
990 credential: None,
991 credential_id: None,
992 }
993 }
994 };
995
996 let transport = self
997 .transports
998 .get_mut(&source)
999 .ok_or(RemoteClientError::SecureChannelNotEstablished)?;
1000
1001 let domain = reply.credential.as_ref().and_then(|c| c.domain.clone());
1003 let fields = reply
1004 .credential
1005 .as_ref()
1006 .map_or_else(CredentialFieldSet::default, |c| CredentialFieldSet {
1007 has_username: c.username.is_some(),
1008 has_password: c.password.is_some(),
1009 has_totp: c.totp.is_some(),
1010 has_uri: c.uri.is_some(),
1011 has_notes: c.notes.is_some(),
1012 });
1013
1014 let response_payload = CredentialResponsePayload {
1016 credential: if reply.approved {
1017 reply.credential
1018 } else {
1019 None
1020 },
1021 error: if !reply.approved {
1022 Some("Request denied".to_string())
1023 } else {
1024 None
1025 },
1026 request_id: Some(request_id.clone()),
1027 };
1028
1029 let response_json = serde_json::to_string(&response_payload)?;
1031 let encrypted = transport
1032 .encrypt(response_json.as_bytes())
1033 .map_err(|e| RemoteClientError::NoiseProtocol(e.to_string()))?;
1034
1035 let msg = ProtocolMessage::CredentialResponse {
1036 encrypted: STANDARD.encode(encrypted.encode()),
1037 };
1038
1039 let msg_json = serde_json::to_string(&msg)?;
1040
1041 self.proxy_client
1042 .send_to(source, msg_json.into_bytes())
1043 .await?;
1044
1045 if reply.approved {
1047 self.audit_log
1048 .write(AuditEvent::CredentialApproved {
1049 query: &query,
1050 domain: domain.as_deref(),
1051 remote_identity: &source,
1052 request_id: &request_id,
1053 credential_id: reply.credential_id.as_deref(),
1054 fields,
1055 })
1056 .await;
1057
1058 notification_tx
1059 .send(UserClientNotification::CredentialApproved {
1060 domain,
1061 credential_id: reply.credential_id,
1062 })
1063 .await
1064 .ok();
1065 } else {
1066 self.audit_log
1067 .write(AuditEvent::CredentialDenied {
1068 query: &query,
1069 domain: domain.as_deref(),
1070 remote_identity: &source,
1071 request_id: &request_id,
1072 credential_id: reply.credential_id.as_deref(),
1073 })
1074 .await;
1075
1076 notification_tx
1077 .send(UserClientNotification::CredentialDenied {
1078 domain,
1079 credential_id: reply.credential_id,
1080 })
1081 .await
1082 .ok();
1083 }
1084
1085 Ok(())
1086 }
1087
1088 async fn complete_handshake(
1090 &self,
1091 remote_fingerprint: IdentityFingerprint,
1092 handshake_data: &str,
1093 ciphersuite_str: &str,
1094 psk: Option<&Psk>,
1095 ) -> Result<(MultiDeviceTransport, String), RemoteClientError> {
1096 let ciphersuite = match ciphersuite_str {
1098 s if s.contains("Kyber768") => Ciphersuite::PQNNpsk2_Kyber768_XChaCha20Poly1305,
1099 _ => Ciphersuite::ClassicalNNpsk2_25519_XChaCha20Poly1035,
1100 };
1101
1102 let init_bytes = STANDARD
1104 .decode(handshake_data)
1105 .map_err(|e| RemoteClientError::Serialization(format!("Invalid base64: {e}")))?;
1106
1107 let init_packet = ap_noise::HandshakePacket::decode(&init_bytes)
1108 .map_err(|e| RemoteClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
1109
1110 let mut handshake = if let Some(psk) = psk {
1112 ResponderHandshake::with_psk(psk.clone())
1113 } else {
1114 ResponderHandshake::new()
1115 };
1116
1117 handshake.receive_start(&init_packet)?;
1119 let response_packet = handshake.send_finish()?;
1120 let (transport, fingerprint) = handshake.finalize()?;
1121
1122 let msg = ProtocolMessage::HandshakeResponse {
1124 data: STANDARD.encode(response_packet.encode()?),
1125 ciphersuite: format!("{ciphersuite:?}"),
1126 };
1127
1128 let msg_json = serde_json::to_string(&msg)?;
1129
1130 self.proxy_client
1131 .send_to(remote_fingerprint, msg_json.into_bytes())
1132 .await?;
1133
1134 debug!("Sent handshake response to {:?}", remote_fingerprint);
1135
1136 Ok((transport, fingerprint.to_string()))
1137 }
1138}