1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(target_arch = "wasm32")]
9use web_time::Instant;
10
11use ap_noise::{Ciphersuite, MultiDeviceTransport, Psk, ResponderHandshake};
12use ap_proxy_client::IncomingMessage;
13use ap_proxy_protocol::{IdentityFingerprint, RendezvousCode};
14use base64::{Engine, engine::general_purpose::STANDARD};
15use futures_util::StreamExt;
16use futures_util::stream::FuturesUnordered;
17use tokio::sync::oneshot;
18
19use crate::proxy::ProxyClient;
20use crate::types::{CredentialData, PskId};
21use tokio::sync::mpsc;
22use tracing::{debug, warn};
23
24const RECONNECT_BASE_DELAY: Duration = Duration::from_secs(2);
26const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(15 * 60);
28const PENDING_PAIRING_MAX_AGE: Duration = Duration::from_secs(10 * 60);
30
31pub(crate) enum PairingKind {
33 Rendezvous {
38 reply: Option<oneshot::Sender<Result<RendezvousCode, ClientError>>>,
39 },
40 Psk { psk: Psk, psk_id: PskId },
42}
43
44pub(crate) struct PendingPairing {
46 connection_name: Option<String>,
48 created_at: Instant,
50 kind: PairingKind,
52}
53
54use crate::{
55 error::ClientError,
56 traits::{
57 AuditConnectionType, AuditEvent, AuditLog, CredentialFieldSet, IdentityProvider,
58 NoOpAuditLog, SessionStore,
59 },
60 types::{CredentialRequestPayload, CredentialResponsePayload, ProtocolMessage},
61};
62
63#[derive(Debug, Clone)]
69pub enum UserClientNotification {
70 Listening {},
72 HandshakeStart {},
74 HandshakeProgress {
76 message: String,
78 },
79 HandshakeComplete {},
81 HandshakeFingerprint {
83 fingerprint: String,
85 identity: IdentityFingerprint,
87 },
88 FingerprintVerified {},
90 FingerprintRejected {
92 reason: String,
94 },
95 CredentialApproved {
97 domain: Option<String>,
99 credential_id: Option<String>,
101 },
102 CredentialDenied {
104 domain: Option<String>,
106 credential_id: Option<String>,
108 },
109 SessionRefreshed {
111 fingerprint: IdentityFingerprint,
113 },
114 ClientDisconnected {},
116 Reconnecting {
118 attempt: u32,
120 },
121 Reconnected {},
123 Error {
125 message: String,
127 context: Option<String>,
129 },
130}
131
132#[derive(Debug)]
134pub struct FingerprintVerificationReply {
135 pub approved: bool,
137 pub name: Option<String>,
139}
140
141#[derive(Debug)]
143pub struct CredentialRequestReply {
144 pub approved: bool,
146 pub credential: Option<CredentialData>,
148 pub credential_id: Option<String>,
150}
151
152#[derive(Debug)]
154pub enum UserClientRequest {
155 VerifyFingerprint {
157 fingerprint: String,
159 identity: IdentityFingerprint,
161 reply: oneshot::Sender<FingerprintVerificationReply>,
163 },
164 CredentialRequest {
166 query: crate::types::CredentialQuery,
168 identity: IdentityFingerprint,
170 reply: oneshot::Sender<CredentialRequestReply>,
172 },
173}
174
175enum PendingReply {
181 FingerprintVerification {
182 source: IdentityFingerprint,
183 transport: MultiDeviceTransport,
184 connection_name: Option<String>,
185 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
186 },
187 CredentialResponse {
188 source: IdentityFingerprint,
189 request_id: String,
190 query: crate::types::CredentialQuery,
191 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
192 },
193}
194
195type PendingReplyFuture = Pin<Box<dyn Future<Output = PendingReply> + Send>>;
197
198enum UserClientCommand {
204 GetPskToken {
206 name: Option<String>,
207 reply: oneshot::Sender<Result<String, ClientError>>,
208 },
209 GetRendezvousToken {
211 name: Option<String>,
212 reply: oneshot::Sender<Result<RendezvousCode, ClientError>>,
213 },
214}
215
216#[derive(Clone)]
224pub struct UserClient {
225 command_tx: mpsc::Sender<UserClientCommand>,
226}
227
228impl UserClient {
229 pub async fn connect(
238 identity_provider: Box<dyn IdentityProvider>,
239 session_store: Box<dyn SessionStore>,
240 mut proxy_client: Box<dyn ProxyClient>,
241 notification_tx: mpsc::Sender<UserClientNotification>,
242 request_tx: mpsc::Sender<UserClientRequest>,
243 audit_log: Option<Box<dyn AuditLog>>,
244 ) -> Result<Self, ClientError> {
245 let incoming_rx = proxy_client.connect().await?;
247
248 let (command_tx, command_rx) = mpsc::channel(32);
250
251 let own_fingerprint = identity_provider.fingerprint().await;
253
254 let inner = UserClientInner {
256 session_store,
257 proxy_client,
258 own_fingerprint,
259 transports: HashMap::new(),
260 pending_pairings: Vec::new(),
261 audit_log: audit_log.unwrap_or_else(|| Box::new(NoOpAuditLog)),
262 };
263
264 #[cfg(target_arch = "wasm32")]
266 wasm_bindgen_futures::spawn_local(inner.run_event_loop(
267 incoming_rx,
268 command_rx,
269 notification_tx,
270 request_tx,
271 ));
272 #[cfg(not(target_arch = "wasm32"))]
273 tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
274
275 Ok(Self { command_tx })
276 }
277
278 pub async fn get_psk_token(&self, name: Option<String>) -> Result<String, ClientError> {
283 let (tx, rx) = oneshot::channel();
284 self.command_tx
285 .send(UserClientCommand::GetPskToken { name, reply: tx })
286 .await
287 .map_err(|_| ClientError::ChannelClosed)?;
288 rx.await.map_err(|_| ClientError::ChannelClosed)?
289 }
290
291 pub async fn get_rendezvous_token(
296 &self,
297 name: Option<String>,
298 ) -> Result<RendezvousCode, ClientError> {
299 let (tx, rx) = oneshot::channel();
300 self.command_tx
301 .send(UserClientCommand::GetRendezvousToken { name, reply: tx })
302 .await
303 .map_err(|_| ClientError::ChannelClosed)?;
304 rx.await.map_err(|_| ClientError::ChannelClosed)?
305 }
306}
307
308struct UserClientInner {
314 session_store: Box<dyn SessionStore>,
315 proxy_client: Box<dyn ProxyClient>,
316 own_fingerprint: IdentityFingerprint,
318 transports: HashMap<IdentityFingerprint, MultiDeviceTransport>,
320 pending_pairings: Vec<PendingPairing>,
322 audit_log: Box<dyn AuditLog>,
324}
325
326impl UserClientInner {
327 async fn run_event_loop(
329 mut self,
330 mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
331 mut command_rx: mpsc::Receiver<UserClientCommand>,
332 notification_tx: mpsc::Sender<UserClientNotification>,
333 request_tx: mpsc::Sender<UserClientRequest>,
334 ) {
335 notification_tx
337 .send(UserClientNotification::Listening {})
338 .await
339 .ok();
340
341 let mut pending_replies: FuturesUnordered<PendingReplyFuture> = FuturesUnordered::new();
342
343 loop {
344 tokio::select! {
345 msg = incoming_rx.recv() => {
346 match msg {
347 Some(msg) => {
348 match self.handle_incoming(msg, ¬ification_tx, &request_tx).await {
349 Ok(Some(fut)) => pending_replies.push(fut),
350 Ok(None) => {}
351 Err(e) => {
352 warn!("Error handling incoming message: {}", e);
353 notification_tx.send(UserClientNotification::Error {
354 message: e.to_string(),
355 context: Some("handle_incoming".to_string()),
356 }).await.ok();
357 }
358 }
359 }
360 None => {
361 notification_tx.send(UserClientNotification::ClientDisconnected {}).await.ok();
363 match self.attempt_reconnection(¬ification_tx).await {
364 Ok(new_rx) => {
365 incoming_rx = new_rx;
366 notification_tx.send(UserClientNotification::Reconnected {}).await.ok();
367 }
368 Err(e) => {
369 warn!("Reconnection failed permanently: {}", e);
370 notification_tx.send(UserClientNotification::Error {
371 message: e.to_string(),
372 context: Some("reconnection".to_string()),
373 }).await.ok();
374 return;
375 }
376 }
377 }
378 }
379 }
380 Some(reply) = pending_replies.next() => {
381 if let Err(e) = self.process_pending_reply(reply, ¬ification_tx).await {
382 warn!("Error processing pending reply: {}", e);
383 notification_tx.send(UserClientNotification::Error {
384 message: e.to_string(),
385 context: Some("process_pending_reply".to_string()),
386 }).await.ok();
387 }
388 }
389 cmd = command_rx.recv() => {
390 match cmd {
391 Some(cmd) => self.handle_command(cmd, ¬ification_tx).await,
392 None => {
393 debug!("All UserClient handles dropped, shutting down event loop");
395 return;
396 }
397 }
398 }
399 }
400 }
401 }
402
403 async fn attempt_reconnection(
405 &mut self,
406 notification_tx: &mpsc::Sender<UserClientNotification>,
407 ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ClientError> {
408 use rand::{Rng, SeedableRng};
409
410 let mut rng = rand::rngs::StdRng::from_entropy();
411 let mut attempt: u32 = 0;
412
413 loop {
414 attempt = attempt.saturating_add(1);
415
416 let _ = self.proxy_client.disconnect().await;
418
419 match self.proxy_client.connect().await {
420 Ok(new_rx) => {
421 debug!("Reconnected to proxy on attempt {}", attempt);
422 return Ok(new_rx);
423 }
424 Err(e) => {
425 debug!("Reconnection attempt {} failed: {}", attempt, e);
426 notification_tx
427 .send(UserClientNotification::Reconnecting { attempt })
428 .await
429 .ok();
430
431 let exp_delay = RECONNECT_BASE_DELAY
433 .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
434 let delay = exp_delay.min(RECONNECT_MAX_DELAY);
435 let jitter_max = (delay.as_millis() as u64) / 4;
436 let jitter = if jitter_max > 0 {
437 rng.gen_range(0..=jitter_max)
438 } else {
439 0
440 };
441 let total_delay = delay + Duration::from_millis(jitter);
442
443 crate::compat::sleep(total_delay).await;
444 }
445 }
446 }
447 }
448
449 async fn handle_incoming(
451 &mut self,
452 msg: IncomingMessage,
453 notification_tx: &mpsc::Sender<UserClientNotification>,
454 request_tx: &mpsc::Sender<UserClientRequest>,
455 ) -> Result<Option<PendingReplyFuture>, ClientError> {
456 match msg {
457 IncomingMessage::Send {
458 source, payload, ..
459 } => {
460 let text = String::from_utf8(payload)
462 .map_err(|e| ClientError::Serialization(format!("Invalid UTF-8: {e}")))?;
463
464 let protocol_msg: ProtocolMessage = serde_json::from_str(&text)?;
465
466 match protocol_msg {
467 ProtocolMessage::HandshakeInit {
468 data,
469 ciphersuite,
470 psk_id,
471 } => {
472 self.handle_handshake_init(
473 source,
474 data,
475 ciphersuite,
476 psk_id,
477 notification_tx,
478 request_tx,
479 )
480 .await
481 }
482 ProtocolMessage::CredentialRequest { encrypted } => {
483 self.handle_credential_request(
484 source,
485 encrypted,
486 notification_tx,
487 request_tx,
488 )
489 .await
490 }
491 _ => {
492 debug!("Received unexpected message type from {:?}", source);
493 Ok(None)
494 }
495 }
496 }
497 IncomingMessage::RendezvousInfo(code) => {
498 let idx = self
500 .pending_pairings
501 .iter()
502 .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }));
503
504 if let Some(idx) = idx {
505 let pairing = &mut self.pending_pairings[idx];
507 if let PairingKind::Rendezvous { reply } = &mut pairing.kind {
508 if let Some(sender) = reply.take() {
509 debug!("Completed rendezvous pairing via handle, code: {}", code);
510 let _ = sender.send(Ok(code));
511 }
512 }
513 } else {
514 debug!("Received RendezvousInfo but no pending rendezvous pairing found");
515 }
516 Ok(None)
517 }
518 IncomingMessage::IdentityInfo { .. } => {
519 debug!("Received unexpected IdentityInfo message");
521 Ok(None)
522 }
523 }
524 }
525
526 async fn handle_handshake_init(
528 &mut self,
529 source: IdentityFingerprint,
530 data: String,
531 ciphersuite: String,
532 psk_id: Option<PskId>,
533 notification_tx: &mpsc::Sender<UserClientNotification>,
534 request_tx: &mpsc::Sender<UserClientRequest>,
535 ) -> Result<Option<PendingReplyFuture>, ClientError> {
536 debug!("Received handshake init from source: {:?}", source);
537 notification_tx
538 .send(UserClientNotification::HandshakeStart {})
539 .await
540 .ok();
541
542 let is_new_connection = !self.session_store.has_session(&source).await;
544
545 let (psk_for_handshake, matched_pairing_name, is_psk_connection) = if !is_new_connection {
547 (None, None, false)
549 } else {
550 Self::prune_stale_pairings(&mut self.pending_pairings);
552
553 match &psk_id {
554 Some(id) => {
555 let idx = self.pending_pairings.iter().position(
557 |p| matches!(&p.kind, PairingKind::Psk { psk_id: pid, .. } if pid == id),
558 );
559 if let Some(idx) = idx {
560 let pairing = self.pending_pairings.remove(idx);
561 let psk = match pairing.kind {
562 PairingKind::Psk { psk, .. } => psk,
563 PairingKind::Rendezvous { .. } => unreachable!(),
564 };
565 (Some(psk), pairing.connection_name, true)
566 } else {
567 warn!("No matching PSK pairing for psk_id: {}", id);
568 return Err(ClientError::InvalidState {
569 expected: "matching PSK pairing".to_string(),
570 current: format!("no pairing for psk_id {id}"),
571 });
572 }
573 }
574 None => {
575 let idx = self
578 .pending_pairings
579 .iter()
580 .position(|p| matches!(p.kind, PairingKind::Rendezvous { reply: None }));
581 let connection_name =
582 idx.and_then(|i| self.pending_pairings.remove(i).connection_name);
583 (None, connection_name, false)
584 }
585 }
586 };
587
588 let (transport, fingerprint_str) = self
589 .complete_handshake(source, &data, &ciphersuite, psk_for_handshake.as_ref())
590 .await?;
591
592 notification_tx
593 .send(UserClientNotification::HandshakeComplete {})
594 .await
595 .ok();
596
597 if is_new_connection && !is_psk_connection {
598 let (tx, rx) = oneshot::channel();
600
601 request_tx
602 .send(UserClientRequest::VerifyFingerprint {
603 fingerprint: fingerprint_str,
604 identity: source,
605 reply: tx,
606 })
607 .await
608 .ok();
609
610 let fut: PendingReplyFuture = Box::pin(async move {
611 let result = rx.await;
612 PendingReply::FingerprintVerification {
613 source,
614 transport,
615 connection_name: matched_pairing_name,
616 reply: result,
617 }
618 });
619
620 Ok(Some(fut))
621 } else if !is_new_connection {
622 self.transports.insert(source, transport.clone());
624 self.session_store.cache_session(source).await?;
625 self.session_store
626 .save_transport_state(&source, transport)
627 .await?;
628
629 self.audit_log
630 .write(AuditEvent::SessionRefreshed {
631 remote_identity: &source,
632 })
633 .await;
634
635 notification_tx
636 .send(UserClientNotification::SessionRefreshed {
637 fingerprint: source,
638 })
639 .await
640 .ok();
641
642 Ok(None)
643 } else {
644 self.accept_new_connection(
646 source,
647 transport,
648 matched_pairing_name.as_deref(),
649 AuditConnectionType::Psk,
650 )
651 .await?;
652
653 notification_tx
655 .send(UserClientNotification::HandshakeFingerprint {
656 fingerprint: fingerprint_str,
657 identity: source,
658 })
659 .await
660 .ok();
661
662 Ok(None)
663 }
664 }
665
666 fn prune_stale_pairings(pairings: &mut Vec<PendingPairing>) {
668 pairings.retain(|p| p.created_at.elapsed() < PENDING_PAIRING_MAX_AGE);
669 }
670
671 async fn accept_new_connection(
673 &mut self,
674 fingerprint: IdentityFingerprint,
675 transport: MultiDeviceTransport,
676 session_name: Option<&str>,
677 connection_type: AuditConnectionType,
678 ) -> Result<(), ClientError> {
679 self.transports.insert(fingerprint, transport.clone());
680 self.session_store.cache_session(fingerprint).await?;
681 if let Some(name) = session_name {
682 self.session_store
683 .set_session_name(&fingerprint, name.to_owned())
684 .await?;
685 }
686 self.session_store
687 .save_transport_state(&fingerprint, transport)
688 .await?;
689
690 self.audit_log
691 .write(AuditEvent::ConnectionEstablished {
692 remote_identity: &fingerprint,
693 remote_name: session_name,
694 connection_type,
695 })
696 .await;
697
698 Ok(())
699 }
700
701 async fn handle_credential_request(
703 &mut self,
704 source: IdentityFingerprint,
705 encrypted: String,
706 notification_tx: &mpsc::Sender<UserClientNotification>,
707 request_tx: &mpsc::Sender<UserClientRequest>,
708 ) -> Result<Option<PendingReplyFuture>, ClientError> {
709 if !self.transports.contains_key(&source) {
710 debug!("Loading transport state for source: {:?}", source);
711 let session = self
712 .session_store
713 .load_transport_state(&source)
714 .await?
715 .ok_or_else(|| {
716 ClientError::SessionCache(format!(
717 "Missing transport state for cached session {source:?}"
718 ))
719 })?;
720 self.transports.insert(source, session);
721 }
722
723 let transport = self
725 .transports
726 .get_mut(&source)
727 .ok_or(ClientError::SecureChannelNotEstablished)?;
728
729 let encrypted_bytes = STANDARD
731 .decode(&encrypted)
732 .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
733
734 let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
735 .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
736
737 let decrypted = transport
738 .decrypt(&packet)
739 .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
740
741 let request: CredentialRequestPayload = serde_json::from_slice(&decrypted)?;
742
743 self.audit_log
744 .write(AuditEvent::CredentialRequested {
745 query: &request.query,
746 remote_identity: &source,
747 request_id: &request.request_id,
748 })
749 .await;
750
751 let (tx, rx) = oneshot::channel();
753
754 if request_tx
756 .send(UserClientRequest::CredentialRequest {
757 query: request.query.clone(),
758 identity: source,
759 reply: tx,
760 })
761 .await
762 .is_err()
763 {
764 warn!("Request channel closed, cannot send credential request");
766 notification_tx
767 .send(UserClientNotification::Error {
768 message: "Request channel closed".to_string(),
769 context: Some("handle_credential_request".to_string()),
770 })
771 .await
772 .ok();
773 return Ok(None);
774 }
775
776 let request_id = request.request_id;
778 let query = request.query;
779 let fut: PendingReplyFuture = Box::pin(async move {
780 let result = rx.await;
781 PendingReply::CredentialResponse {
782 source,
783 request_id,
784 query,
785 reply: result,
786 }
787 });
788
789 Ok(Some(fut))
790 }
791
792 async fn process_pending_reply(
794 &mut self,
795 reply: PendingReply,
796 notification_tx: &mpsc::Sender<UserClientNotification>,
797 ) -> Result<(), ClientError> {
798 match reply {
799 PendingReply::FingerprintVerification {
800 source,
801 transport,
802 connection_name,
803 reply,
804 } => {
805 self.process_fingerprint_reply(
806 source,
807 transport,
808 connection_name,
809 reply,
810 notification_tx,
811 )
812 .await
813 }
814 PendingReply::CredentialResponse {
815 source,
816 request_id,
817 query,
818 reply,
819 } => {
820 self.process_credential_reply(source, request_id, query, reply, notification_tx)
821 .await
822 }
823 }
824 }
825
826 async fn handle_command(
828 &mut self,
829 cmd: UserClientCommand,
830 notification_tx: &mpsc::Sender<UserClientNotification>,
831 ) {
832 match cmd {
833 UserClientCommand::GetPskToken { name, reply } => {
834 let result = self.generate_psk_token(name).await;
835 let _ = reply.send(result);
836 }
837 UserClientCommand::GetRendezvousToken { name, reply } => {
838 if let Err(e) = self.proxy_client.request_rendezvous().await {
839 let _ = reply.send(Err(e));
840 return;
841 }
842
843 Self::prune_stale_pairings(&mut self.pending_pairings);
845
846 if let Some(old_idx) = self
849 .pending_pairings
850 .iter()
851 .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }))
852 {
853 let old = self.pending_pairings.remove(old_idx);
854 if let PairingKind::Rendezvous {
855 reply: Some(old_reply),
856 } = old.kind
857 {
858 warn!("Replacing existing pending rendezvous pairing");
859 let _ = old_reply.send(Err(ClientError::InvalidState {
860 expected: "single pending rendezvous".to_string(),
861 current: "replaced by new rendezvous request".to_string(),
862 }));
863 }
864 }
865
866 self.pending_pairings.push(PendingPairing {
869 connection_name: name,
870 created_at: Instant::now(),
871 kind: PairingKind::Rendezvous { reply: Some(reply) },
872 });
873
874 notification_tx
876 .send(UserClientNotification::HandshakeProgress {
877 message: "Requesting rendezvous code from proxy...".to_string(),
878 })
879 .await
880 .ok();
881 }
882 }
883 }
884
885 async fn generate_psk_token(&mut self, name: Option<String>) -> Result<String, ClientError> {
887 let psk = Psk::generate();
888 let psk_id = psk.id();
889 let token = format!("{}_{}", psk.to_hex(), hex::encode(self.own_fingerprint.0));
890
891 let pairing = PendingPairing {
892 connection_name: name,
893 created_at: Instant::now(),
894 kind: PairingKind::Psk { psk, psk_id },
895 };
896
897 Self::prune_stale_pairings(&mut self.pending_pairings);
898 self.pending_pairings.push(pairing);
899 debug!("Created PSK pairing, token generated");
900
901 Ok(token)
902 }
903
904 async fn process_fingerprint_reply(
906 &mut self,
907 source: IdentityFingerprint,
908 transport: MultiDeviceTransport,
909 connection_name: Option<String>,
910 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
911 notification_tx: &mpsc::Sender<UserClientNotification>,
912 ) -> Result<(), ClientError> {
913 match reply {
914 Ok(FingerprintVerificationReply {
915 approved: true,
916 name,
917 }) => {
918 let session_name = name.or(connection_name);
920 self.accept_new_connection(
921 source,
922 transport,
923 session_name.as_deref(),
924 AuditConnectionType::Rendezvous,
925 )
926 .await?;
927
928 notification_tx
929 .send(UserClientNotification::FingerprintVerified {})
930 .await
931 .ok();
932 }
933 Ok(FingerprintVerificationReply {
934 approved: false, ..
935 }) => {
936 self.audit_log
937 .write(AuditEvent::ConnectionRejected {
938 remote_identity: &source,
939 })
940 .await;
941
942 notification_tx
943 .send(UserClientNotification::FingerprintRejected {
944 reason: "User rejected fingerprint verification".to_string(),
945 })
946 .await
947 .ok();
948 }
949 Err(_) => {
950 warn!("Fingerprint verification reply channel dropped, treating as rejection");
952 self.audit_log
953 .write(AuditEvent::ConnectionRejected {
954 remote_identity: &source,
955 })
956 .await;
957
958 notification_tx
959 .send(UserClientNotification::FingerprintRejected {
960 reason: "Verification cancelled (reply dropped)".to_string(),
961 })
962 .await
963 .ok();
964 }
965 }
966
967 Ok(())
968 }
969
970 #[allow(clippy::too_many_arguments)]
972 async fn process_credential_reply(
973 &mut self,
974 source: IdentityFingerprint,
975 request_id: String,
976 query: crate::types::CredentialQuery,
977 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
978 notification_tx: &mpsc::Sender<UserClientNotification>,
979 ) -> Result<(), ClientError> {
980 let reply = match reply {
981 Ok(r) => r,
982 Err(_) => {
983 warn!("Credential reply channel dropped, treating as denial");
985 CredentialRequestReply {
986 approved: false,
987 credential: None,
988 credential_id: None,
989 }
990 }
991 };
992
993 let transport = self
994 .transports
995 .get_mut(&source)
996 .ok_or(ClientError::SecureChannelNotEstablished)?;
997
998 let domain = reply.credential.as_ref().and_then(|c| c.domain.clone());
1000 let fields = reply
1001 .credential
1002 .as_ref()
1003 .map_or_else(CredentialFieldSet::default, |c| CredentialFieldSet {
1004 has_username: c.username.is_some(),
1005 has_password: c.password.is_some(),
1006 has_totp: c.totp.is_some(),
1007 has_uri: c.uri.is_some(),
1008 has_notes: c.notes.is_some(),
1009 });
1010
1011 let response_payload = CredentialResponsePayload {
1013 credential: if reply.approved {
1014 reply.credential
1015 } else {
1016 None
1017 },
1018 error: if !reply.approved {
1019 Some("Request denied".to_string())
1020 } else {
1021 None
1022 },
1023 request_id: Some(request_id.clone()),
1024 };
1025
1026 let response_json = serde_json::to_string(&response_payload)?;
1028 let encrypted = transport
1029 .encrypt(response_json.as_bytes())
1030 .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
1031
1032 let msg = ProtocolMessage::CredentialResponse {
1033 encrypted: STANDARD.encode(encrypted.encode()),
1034 };
1035
1036 let msg_json = serde_json::to_string(&msg)?;
1037
1038 self.proxy_client
1039 .send_to(source, msg_json.into_bytes())
1040 .await?;
1041
1042 if reply.approved {
1044 self.audit_log
1045 .write(AuditEvent::CredentialApproved {
1046 query: &query,
1047 domain: domain.as_deref(),
1048 remote_identity: &source,
1049 request_id: &request_id,
1050 credential_id: reply.credential_id.as_deref(),
1051 fields,
1052 })
1053 .await;
1054
1055 notification_tx
1056 .send(UserClientNotification::CredentialApproved {
1057 domain,
1058 credential_id: reply.credential_id,
1059 })
1060 .await
1061 .ok();
1062 } else {
1063 self.audit_log
1064 .write(AuditEvent::CredentialDenied {
1065 query: &query,
1066 domain: domain.as_deref(),
1067 remote_identity: &source,
1068 request_id: &request_id,
1069 credential_id: reply.credential_id.as_deref(),
1070 })
1071 .await;
1072
1073 notification_tx
1074 .send(UserClientNotification::CredentialDenied {
1075 domain,
1076 credential_id: reply.credential_id,
1077 })
1078 .await
1079 .ok();
1080 }
1081
1082 Ok(())
1083 }
1084
1085 async fn complete_handshake(
1087 &self,
1088 remote_fingerprint: IdentityFingerprint,
1089 handshake_data: &str,
1090 ciphersuite_str: &str,
1091 psk: Option<&Psk>,
1092 ) -> Result<(MultiDeviceTransport, String), ClientError> {
1093 let ciphersuite = match ciphersuite_str {
1095 s if s.contains("Kyber768") => Ciphersuite::PQNNpsk2_Kyber768_XChaCha20Poly1305,
1096 _ => Ciphersuite::ClassicalNNpsk2_25519_XChaCha20Poly1035,
1097 };
1098
1099 let init_bytes = STANDARD
1101 .decode(handshake_data)
1102 .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
1103
1104 let init_packet = ap_noise::HandshakePacket::decode(&init_bytes)
1105 .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
1106
1107 let mut handshake = if let Some(psk) = psk {
1109 ResponderHandshake::with_psk(psk.clone())
1110 } else {
1111 ResponderHandshake::new()
1112 };
1113
1114 handshake.receive_start(&init_packet)?;
1116 let response_packet = handshake.send_finish()?;
1117 let (transport, fingerprint) = handshake.finalize()?;
1118
1119 let msg = ProtocolMessage::HandshakeResponse {
1121 data: STANDARD.encode(response_packet.encode()?),
1122 ciphersuite: format!("{ciphersuite:?}"),
1123 };
1124
1125 let msg_json = serde_json::to_string(&msg)?;
1126
1127 self.proxy_client
1128 .send_to(remote_fingerprint, msg_json.into_bytes())
1129 .await?;
1130
1131 debug!("Sent handshake response to {:?}", remote_fingerprint);
1132
1133 Ok((transport, fingerprint.to_string()))
1134 }
1135}