1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(target_arch = "wasm32")]
9use web_time::Instant;
10
11use ap_noise::{Ciphersuite, MultiDeviceTransport, Psk, ResponderHandshake};
12use ap_proxy_client::IncomingMessage;
13use ap_proxy_protocol::{IdentityFingerprint, RendezvousCode};
14use base64::{Engine, engine::general_purpose::STANDARD};
15use futures_util::StreamExt;
16use futures_util::stream::FuturesUnordered;
17use tokio::sync::oneshot;
18
19use crate::proxy::ProxyClient;
20use crate::types::{CredentialData, PskId};
21use tokio::sync::mpsc;
22use tracing::{debug, warn};
23
24const RECONNECT_BASE_DELAY: Duration = Duration::from_secs(2);
26const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(15 * 60);
28const PENDING_PAIRING_MAX_AGE: Duration = Duration::from_secs(10 * 60);
30
31pub(crate) enum PairingKind {
33 Rendezvous {
38 reply: Option<oneshot::Sender<Result<RendezvousCode, ClientError>>>,
39 },
40 Psk { psk: Psk, psk_id: PskId },
42}
43
44pub(crate) struct PendingPairing {
46 connection_name: Option<String>,
48 created_at: Instant,
50 kind: PairingKind,
52}
53
54use super::notify;
55use crate::{
56 error::ClientError,
57 traits::{
58 AuditConnectionType, AuditEvent, AuditLog, CredentialFieldSet, IdentityProvider,
59 NoOpAuditLog, SessionStore,
60 },
61 types::{CredentialRequestPayload, CredentialResponsePayload, ProtocolMessage},
62};
63
64#[derive(Debug, Clone)]
70pub enum UserClientNotification {
71 Listening {},
73 HandshakeStart {},
75 HandshakeProgress {
77 message: String,
79 },
80 HandshakeComplete {},
82 HandshakeFingerprint {
84 fingerprint: String,
86 identity: IdentityFingerprint,
88 },
89 FingerprintVerified {},
91 FingerprintRejected {
93 reason: String,
95 },
96 CredentialApproved {
98 domain: Option<String>,
100 credential_id: Option<String>,
102 },
103 CredentialDenied {
105 domain: Option<String>,
107 credential_id: Option<String>,
109 },
110 SessionRefreshed {
112 fingerprint: IdentityFingerprint,
114 },
115 ClientDisconnected {},
117 Reconnecting {
119 attempt: u32,
121 },
122 Reconnected {},
124 Error {
126 message: String,
128 context: Option<String>,
130 },
131}
132
133#[derive(Debug)]
135pub struct FingerprintVerificationReply {
136 pub approved: bool,
138 pub name: Option<String>,
140}
141
142#[derive(Debug)]
144pub struct CredentialRequestReply {
145 pub approved: bool,
147 pub credential: Option<CredentialData>,
149 pub credential_id: Option<String>,
151}
152
153#[derive(Debug)]
155pub enum UserClientRequest {
156 VerifyFingerprint {
158 fingerprint: String,
160 identity: IdentityFingerprint,
162 reply: oneshot::Sender<FingerprintVerificationReply>,
164 },
165 CredentialRequest {
167 query: crate::types::CredentialQuery,
169 identity: IdentityFingerprint,
171 reply: oneshot::Sender<CredentialRequestReply>,
173 },
174}
175
176enum PendingReply {
182 FingerprintVerification {
183 source: IdentityFingerprint,
184 transport: MultiDeviceTransport,
185 connection_name: Option<String>,
186 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
187 },
188 CredentialResponse {
189 source: IdentityFingerprint,
190 request_id: String,
191 query: crate::types::CredentialQuery,
192 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
193 },
194}
195
196type PendingReplyFuture = Pin<Box<dyn Future<Output = PendingReply> + Send>>;
198
199enum UserClientCommand {
205 GetPskToken {
207 name: Option<String>,
208 reply: oneshot::Sender<Result<String, ClientError>>,
209 },
210 GetRendezvousToken {
212 name: Option<String>,
213 reply: oneshot::Sender<Result<RendezvousCode, ClientError>>,
214 },
215}
216
217pub struct UserClientHandle {
227 pub client: UserClient,
228 pub notifications: mpsc::Receiver<UserClientNotification>,
229 pub requests: mpsc::Receiver<UserClientRequest>,
230}
231
232#[derive(Clone)]
233pub struct UserClient {
234 command_tx: mpsc::Sender<UserClientCommand>,
235}
236
237impl UserClient {
238 pub async fn connect(
247 identity_provider: Box<dyn IdentityProvider>,
248 session_store: Box<dyn SessionStore>,
249 mut proxy_client: Box<dyn ProxyClient>,
250 audit_log: Option<Box<dyn AuditLog>>,
251 ) -> Result<UserClientHandle, ClientError> {
252 let incoming_rx = proxy_client.connect().await?;
254
255 let (notification_tx, notification_rx) = mpsc::channel(32);
257 let (request_tx, request_rx) = mpsc::channel(32);
258
259 let (command_tx, command_rx) = mpsc::channel(32);
261
262 let own_fingerprint = identity_provider.fingerprint().await;
264
265 let inner = UserClientInner {
267 session_store,
268 proxy_client,
269 own_fingerprint,
270 transports: HashMap::new(),
271 pending_pairings: Vec::new(),
272 audit_log: audit_log.unwrap_or_else(|| Box::new(NoOpAuditLog)),
273 };
274
275 #[cfg(target_arch = "wasm32")]
277 wasm_bindgen_futures::spawn_local(inner.run_event_loop(
278 incoming_rx,
279 command_rx,
280 notification_tx,
281 request_tx,
282 ));
283 #[cfg(not(target_arch = "wasm32"))]
284 tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
285
286 Ok(UserClientHandle {
287 client: Self { command_tx },
288 notifications: notification_rx,
289 requests: request_rx,
290 })
291 }
292
293 pub async fn get_psk_token(&self, name: Option<String>) -> Result<String, ClientError> {
298 let (tx, rx) = oneshot::channel();
299 self.command_tx
300 .send(UserClientCommand::GetPskToken { name, reply: tx })
301 .await
302 .map_err(|_| ClientError::ChannelClosed)?;
303 rx.await.map_err(|_| ClientError::ChannelClosed)?
304 }
305
306 pub async fn get_rendezvous_token(
311 &self,
312 name: Option<String>,
313 ) -> Result<RendezvousCode, ClientError> {
314 let (tx, rx) = oneshot::channel();
315 self.command_tx
316 .send(UserClientCommand::GetRendezvousToken { name, reply: tx })
317 .await
318 .map_err(|_| ClientError::ChannelClosed)?;
319 rx.await.map_err(|_| ClientError::ChannelClosed)?
320 }
321}
322
323struct UserClientInner {
329 session_store: Box<dyn SessionStore>,
330 proxy_client: Box<dyn ProxyClient>,
331 own_fingerprint: IdentityFingerprint,
333 transports: HashMap<IdentityFingerprint, MultiDeviceTransport>,
335 pending_pairings: Vec<PendingPairing>,
337 audit_log: Box<dyn AuditLog>,
339}
340
341impl UserClientInner {
342 async fn run_event_loop(
344 mut self,
345 mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
346 mut command_rx: mpsc::Receiver<UserClientCommand>,
347 notification_tx: mpsc::Sender<UserClientNotification>,
348 request_tx: mpsc::Sender<UserClientRequest>,
349 ) {
350 notify!(notification_tx, UserClientNotification::Listening {});
352
353 let mut pending_replies: FuturesUnordered<PendingReplyFuture> = FuturesUnordered::new();
354
355 loop {
356 tokio::select! {
357 msg = incoming_rx.recv() => {
358 match msg {
359 Some(msg) => {
360 match self.handle_incoming(msg, ¬ification_tx, &request_tx).await {
361 Ok(Some(fut)) => pending_replies.push(fut),
362 Ok(None) => {}
363 Err(e) => {
364 warn!("Error handling incoming message: {}", e);
365 notify!(notification_tx, UserClientNotification::Error {
366 message: e.to_string(),
367 context: Some("handle_incoming".to_string()),
368 });
369 }
370 }
371 }
372 None => {
373 notify!(notification_tx, UserClientNotification::ClientDisconnected {});
375 match self.attempt_reconnection(¬ification_tx).await {
376 Ok(new_rx) => {
377 incoming_rx = new_rx;
378 notify!(notification_tx, UserClientNotification::Reconnected {});
379 }
380 Err(e) => {
381 warn!("Reconnection failed permanently: {}", e);
382 notify!(notification_tx, UserClientNotification::Error {
383 message: e.to_string(),
384 context: Some("reconnection".to_string()),
385 });
386 return;
387 }
388 }
389 }
390 }
391 }
392 Some(reply) = pending_replies.next() => {
393 if let Err(e) = self.process_pending_reply(reply, ¬ification_tx).await {
394 warn!("Error processing pending reply: {}", e);
395 notify!(notification_tx, UserClientNotification::Error {
396 message: e.to_string(),
397 context: Some("process_pending_reply".to_string()),
398 });
399 }
400 }
401 cmd = command_rx.recv() => {
402 match cmd {
403 Some(cmd) => self.handle_command(cmd, ¬ification_tx).await,
404 None => {
405 debug!("All UserClient handles dropped, shutting down event loop");
407 return;
408 }
409 }
410 }
411 }
412 }
413 }
414
415 async fn attempt_reconnection(
417 &mut self,
418 notification_tx: &mpsc::Sender<UserClientNotification>,
419 ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ClientError> {
420 use rand::{Rng, SeedableRng};
421
422 let mut rng = rand::rngs::StdRng::from_entropy();
423 let mut attempt: u32 = 0;
424
425 loop {
426 attempt = attempt.saturating_add(1);
427
428 let _ = self.proxy_client.disconnect().await;
430
431 match self.proxy_client.connect().await {
432 Ok(new_rx) => {
433 debug!("Reconnected to proxy on attempt {}", attempt);
434 return Ok(new_rx);
435 }
436 Err(e) => {
437 debug!("Reconnection attempt {} failed: {}", attempt, e);
438 notify!(
439 notification_tx,
440 UserClientNotification::Reconnecting { attempt }
441 );
442
443 let exp_delay = RECONNECT_BASE_DELAY
445 .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
446 let delay = exp_delay.min(RECONNECT_MAX_DELAY);
447 let jitter_max = (delay.as_millis() as u64) / 4;
448 let jitter = if jitter_max > 0 {
449 rng.gen_range(0..=jitter_max)
450 } else {
451 0
452 };
453 let total_delay = delay + Duration::from_millis(jitter);
454
455 crate::compat::sleep(total_delay).await;
456 }
457 }
458 }
459 }
460
461 async fn handle_incoming(
463 &mut self,
464 msg: IncomingMessage,
465 notification_tx: &mpsc::Sender<UserClientNotification>,
466 request_tx: &mpsc::Sender<UserClientRequest>,
467 ) -> Result<Option<PendingReplyFuture>, ClientError> {
468 match msg {
469 IncomingMessage::Send {
470 source, payload, ..
471 } => {
472 let text = String::from_utf8(payload)
474 .map_err(|e| ClientError::Serialization(format!("Invalid UTF-8: {e}")))?;
475
476 let protocol_msg: ProtocolMessage = serde_json::from_str(&text)?;
477
478 match protocol_msg {
479 ProtocolMessage::HandshakeInit {
480 data,
481 ciphersuite,
482 psk_id,
483 } => {
484 self.handle_handshake_init(
485 source,
486 data,
487 ciphersuite,
488 psk_id,
489 notification_tx,
490 request_tx,
491 )
492 .await
493 }
494 ProtocolMessage::CredentialRequest { encrypted } => {
495 self.handle_credential_request(
496 source,
497 encrypted,
498 notification_tx,
499 request_tx,
500 )
501 .await
502 }
503 _ => {
504 debug!("Received unexpected message type from {:?}", source);
505 Ok(None)
506 }
507 }
508 }
509 IncomingMessage::RendezvousInfo(code) => {
510 let idx = self
512 .pending_pairings
513 .iter()
514 .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }));
515
516 if let Some(idx) = idx {
517 let pairing = &mut self.pending_pairings[idx];
519 if let PairingKind::Rendezvous { reply } = &mut pairing.kind {
520 if let Some(sender) = reply.take() {
521 debug!("Completed rendezvous pairing via handle, code: {}", code);
522 let _ = sender.send(Ok(code));
523 }
524 }
525 } else {
526 debug!("Received RendezvousInfo but no pending rendezvous pairing found");
527 }
528 Ok(None)
529 }
530 IncomingMessage::IdentityInfo { .. } => {
531 debug!("Received unexpected IdentityInfo message");
533 Ok(None)
534 }
535 }
536 }
537
538 async fn handle_handshake_init(
540 &mut self,
541 source: IdentityFingerprint,
542 data: String,
543 ciphersuite: String,
544 psk_id: Option<PskId>,
545 notification_tx: &mpsc::Sender<UserClientNotification>,
546 request_tx: &mpsc::Sender<UserClientRequest>,
547 ) -> Result<Option<PendingReplyFuture>, ClientError> {
548 debug!("Received handshake init from source: {:?}", source);
549 notify!(notification_tx, UserClientNotification::HandshakeStart {});
550
551 let is_new_connection = !self.session_store.has_session(&source).await;
553
554 let (psk_for_handshake, matched_pairing_name, is_psk_connection) = if !is_new_connection {
556 (None, None, false)
558 } else {
559 Self::prune_stale_pairings(&mut self.pending_pairings);
561
562 match &psk_id {
563 Some(id) => {
564 let idx = self.pending_pairings.iter().position(
566 |p| matches!(&p.kind, PairingKind::Psk { psk_id: pid, .. } if pid == id),
567 );
568 if let Some(idx) = idx {
569 let pairing = self.pending_pairings.remove(idx);
570 let psk = match pairing.kind {
571 PairingKind::Psk { psk, .. } => psk,
572 PairingKind::Rendezvous { .. } => unreachable!(),
573 };
574 (Some(psk), pairing.connection_name, true)
575 } else {
576 warn!("No matching PSK pairing for psk_id: {}", id);
577 return Err(ClientError::InvalidState {
578 expected: "matching PSK pairing".to_string(),
579 current: format!("no pairing for psk_id {id}"),
580 });
581 }
582 }
583 None => {
584 let idx = self
587 .pending_pairings
588 .iter()
589 .position(|p| matches!(p.kind, PairingKind::Rendezvous { reply: None }));
590 let connection_name =
591 idx.and_then(|i| self.pending_pairings.remove(i).connection_name);
592 (None, connection_name, false)
593 }
594 }
595 };
596
597 let (transport, fingerprint_str) = self
598 .complete_handshake(source, &data, &ciphersuite, psk_for_handshake.as_ref())
599 .await?;
600
601 notify!(
602 notification_tx,
603 UserClientNotification::HandshakeComplete {}
604 );
605
606 if is_new_connection && !is_psk_connection {
607 let (tx, rx) = oneshot::channel();
609
610 if request_tx.capacity() == 0 {
611 warn!("Request channel full, waiting for consumer to drain");
612 }
613 request_tx
614 .send(UserClientRequest::VerifyFingerprint {
615 fingerprint: fingerprint_str,
616 identity: source,
617 reply: tx,
618 })
619 .await
620 .ok();
621
622 let fut: PendingReplyFuture = Box::pin(async move {
623 let result = rx.await;
624 PendingReply::FingerprintVerification {
625 source,
626 transport,
627 connection_name: matched_pairing_name,
628 reply: result,
629 }
630 });
631
632 Ok(Some(fut))
633 } else if !is_new_connection {
634 self.transports.insert(source, transport.clone());
636 self.session_store.cache_session(source).await?;
637 self.session_store
638 .save_transport_state(&source, transport)
639 .await?;
640
641 self.audit_log
642 .write(AuditEvent::SessionRefreshed {
643 remote_identity: &source,
644 })
645 .await;
646
647 notify!(
648 notification_tx,
649 UserClientNotification::SessionRefreshed {
650 fingerprint: source,
651 }
652 );
653
654 Ok(None)
655 } else {
656 self.accept_new_connection(
658 source,
659 transport,
660 matched_pairing_name.as_deref(),
661 AuditConnectionType::Psk,
662 )
663 .await?;
664
665 notify!(
667 notification_tx,
668 UserClientNotification::HandshakeFingerprint {
669 fingerprint: fingerprint_str,
670 identity: source,
671 }
672 );
673
674 Ok(None)
675 }
676 }
677
678 fn prune_stale_pairings(pairings: &mut Vec<PendingPairing>) {
680 pairings.retain(|p| p.created_at.elapsed() < PENDING_PAIRING_MAX_AGE);
681 }
682
683 async fn accept_new_connection(
685 &mut self,
686 fingerprint: IdentityFingerprint,
687 transport: MultiDeviceTransport,
688 session_name: Option<&str>,
689 connection_type: AuditConnectionType,
690 ) -> Result<(), ClientError> {
691 self.transports.insert(fingerprint, transport.clone());
692 self.session_store.cache_session(fingerprint).await?;
693 if let Some(name) = session_name {
694 self.session_store
695 .set_session_name(&fingerprint, name.to_owned())
696 .await?;
697 }
698 self.session_store
699 .save_transport_state(&fingerprint, transport)
700 .await?;
701
702 self.audit_log
703 .write(AuditEvent::ConnectionEstablished {
704 remote_identity: &fingerprint,
705 remote_name: session_name,
706 connection_type,
707 })
708 .await;
709
710 Ok(())
711 }
712
713 async fn handle_credential_request(
715 &mut self,
716 source: IdentityFingerprint,
717 encrypted: String,
718 notification_tx: &mpsc::Sender<UserClientNotification>,
719 request_tx: &mpsc::Sender<UserClientRequest>,
720 ) -> Result<Option<PendingReplyFuture>, ClientError> {
721 if !self.transports.contains_key(&source) {
722 debug!("Loading transport state for source: {:?}", source);
723 let session = self
724 .session_store
725 .load_transport_state(&source)
726 .await?
727 .ok_or_else(|| {
728 ClientError::SessionCache(format!(
729 "Missing transport state for cached session {source:?}"
730 ))
731 })?;
732 self.transports.insert(source, session);
733 }
734
735 let transport = self
737 .transports
738 .get_mut(&source)
739 .ok_or(ClientError::SecureChannelNotEstablished)?;
740
741 let encrypted_bytes = STANDARD
743 .decode(&encrypted)
744 .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
745
746 let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
747 .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
748
749 let decrypted = transport
750 .decrypt(&packet)
751 .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
752
753 let request: CredentialRequestPayload = serde_json::from_slice(&decrypted)?;
754
755 self.audit_log
756 .write(AuditEvent::CredentialRequested {
757 query: &request.query,
758 remote_identity: &source,
759 request_id: &request.request_id,
760 })
761 .await;
762
763 let (tx, rx) = oneshot::channel();
765
766 if request_tx.capacity() == 0 {
768 warn!("Request channel full, waiting for consumer to drain");
769 }
770 if request_tx
771 .send(UserClientRequest::CredentialRequest {
772 query: request.query.clone(),
773 identity: source,
774 reply: tx,
775 })
776 .await
777 .is_err()
778 {
779 warn!("Request channel closed, cannot send credential request");
781 notify!(
782 notification_tx,
783 UserClientNotification::Error {
784 message: "Request channel closed".to_string(),
785 context: Some("handle_credential_request".to_string()),
786 }
787 );
788 return Ok(None);
789 }
790
791 let request_id = request.request_id;
793 let query = request.query;
794 let fut: PendingReplyFuture = Box::pin(async move {
795 let result = rx.await;
796 PendingReply::CredentialResponse {
797 source,
798 request_id,
799 query,
800 reply: result,
801 }
802 });
803
804 Ok(Some(fut))
805 }
806
807 async fn process_pending_reply(
809 &mut self,
810 reply: PendingReply,
811 notification_tx: &mpsc::Sender<UserClientNotification>,
812 ) -> Result<(), ClientError> {
813 match reply {
814 PendingReply::FingerprintVerification {
815 source,
816 transport,
817 connection_name,
818 reply,
819 } => {
820 self.process_fingerprint_reply(
821 source,
822 transport,
823 connection_name,
824 reply,
825 notification_tx,
826 )
827 .await
828 }
829 PendingReply::CredentialResponse {
830 source,
831 request_id,
832 query,
833 reply,
834 } => {
835 self.process_credential_reply(source, request_id, query, reply, notification_tx)
836 .await
837 }
838 }
839 }
840
841 async fn handle_command(
843 &mut self,
844 cmd: UserClientCommand,
845 notification_tx: &mpsc::Sender<UserClientNotification>,
846 ) {
847 match cmd {
848 UserClientCommand::GetPskToken { name, reply } => {
849 let result = self.generate_psk_token(name).await;
850 let _ = reply.send(result);
851 }
852 UserClientCommand::GetRendezvousToken { name, reply } => {
853 if let Err(e) = self.proxy_client.request_rendezvous().await {
854 let _ = reply.send(Err(e));
855 return;
856 }
857
858 Self::prune_stale_pairings(&mut self.pending_pairings);
860
861 if let Some(old_idx) = self
864 .pending_pairings
865 .iter()
866 .position(|p| matches!(&p.kind, PairingKind::Rendezvous { reply: Some(_) }))
867 {
868 let old = self.pending_pairings.remove(old_idx);
869 if let PairingKind::Rendezvous {
870 reply: Some(old_reply),
871 } = old.kind
872 {
873 warn!("Replacing existing pending rendezvous pairing");
874 let _ = old_reply.send(Err(ClientError::InvalidState {
875 expected: "single pending rendezvous".to_string(),
876 current: "replaced by new rendezvous request".to_string(),
877 }));
878 }
879 }
880
881 self.pending_pairings.push(PendingPairing {
884 connection_name: name,
885 created_at: Instant::now(),
886 kind: PairingKind::Rendezvous { reply: Some(reply) },
887 });
888
889 notify!(
891 notification_tx,
892 UserClientNotification::HandshakeProgress {
893 message: "Requesting rendezvous code from proxy...".to_string(),
894 }
895 );
896 }
897 }
898 }
899
900 async fn generate_psk_token(&mut self, name: Option<String>) -> Result<String, ClientError> {
902 let psk = Psk::generate();
903 let psk_id = psk.id();
904 let token = format!("{}_{}", psk.to_hex(), hex::encode(self.own_fingerprint.0));
905
906 let pairing = PendingPairing {
907 connection_name: name,
908 created_at: Instant::now(),
909 kind: PairingKind::Psk { psk, psk_id },
910 };
911
912 Self::prune_stale_pairings(&mut self.pending_pairings);
913 self.pending_pairings.push(pairing);
914 debug!("Created PSK pairing, token generated");
915
916 Ok(token)
917 }
918
919 async fn process_fingerprint_reply(
921 &mut self,
922 source: IdentityFingerprint,
923 transport: MultiDeviceTransport,
924 connection_name: Option<String>,
925 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
926 notification_tx: &mpsc::Sender<UserClientNotification>,
927 ) -> Result<(), ClientError> {
928 match reply {
929 Ok(FingerprintVerificationReply {
930 approved: true,
931 name,
932 }) => {
933 let session_name = name.or(connection_name);
935 self.accept_new_connection(
936 source,
937 transport,
938 session_name.as_deref(),
939 AuditConnectionType::Rendezvous,
940 )
941 .await?;
942
943 notify!(
944 notification_tx,
945 UserClientNotification::FingerprintVerified {}
946 );
947 }
948 Ok(FingerprintVerificationReply {
949 approved: false, ..
950 }) => {
951 self.audit_log
952 .write(AuditEvent::ConnectionRejected {
953 remote_identity: &source,
954 })
955 .await;
956
957 notify!(
958 notification_tx,
959 UserClientNotification::FingerprintRejected {
960 reason: "User rejected fingerprint verification".to_string(),
961 }
962 );
963 }
964 Err(_) => {
965 warn!("Fingerprint verification reply channel dropped, treating as rejection");
967 self.audit_log
968 .write(AuditEvent::ConnectionRejected {
969 remote_identity: &source,
970 })
971 .await;
972
973 notify!(
974 notification_tx,
975 UserClientNotification::FingerprintRejected {
976 reason: "Verification cancelled (reply dropped)".to_string(),
977 }
978 );
979 }
980 }
981
982 Ok(())
983 }
984
985 #[allow(clippy::too_many_arguments)]
987 async fn process_credential_reply(
988 &mut self,
989 source: IdentityFingerprint,
990 request_id: String,
991 query: crate::types::CredentialQuery,
992 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
993 notification_tx: &mpsc::Sender<UserClientNotification>,
994 ) -> Result<(), ClientError> {
995 let reply = match reply {
996 Ok(r) => r,
997 Err(_) => {
998 warn!("Credential reply channel dropped, treating as denial");
1000 CredentialRequestReply {
1001 approved: false,
1002 credential: None,
1003 credential_id: None,
1004 }
1005 }
1006 };
1007
1008 let transport = self
1009 .transports
1010 .get_mut(&source)
1011 .ok_or(ClientError::SecureChannelNotEstablished)?;
1012
1013 let domain = reply.credential.as_ref().and_then(|c| c.domain.clone());
1015 let fields = reply
1016 .credential
1017 .as_ref()
1018 .map_or_else(CredentialFieldSet::default, |c| CredentialFieldSet {
1019 has_username: c.username.is_some(),
1020 has_password: c.password.is_some(),
1021 has_totp: c.totp.is_some(),
1022 has_uri: c.uri.is_some(),
1023 has_notes: c.notes.is_some(),
1024 });
1025
1026 let response_payload = CredentialResponsePayload {
1028 credential: if reply.approved {
1029 reply.credential
1030 } else {
1031 None
1032 },
1033 error: if !reply.approved {
1034 Some("Request denied".to_string())
1035 } else {
1036 None
1037 },
1038 request_id: Some(request_id.clone()),
1039 };
1040
1041 let response_json = serde_json::to_string(&response_payload)?;
1043 let encrypted = transport
1044 .encrypt(response_json.as_bytes())
1045 .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
1046
1047 let msg = ProtocolMessage::CredentialResponse {
1048 encrypted: STANDARD.encode(encrypted.encode()),
1049 };
1050
1051 let msg_json = serde_json::to_string(&msg)?;
1052
1053 self.proxy_client
1054 .send_to(source, msg_json.into_bytes())
1055 .await?;
1056
1057 if reply.approved {
1059 self.audit_log
1060 .write(AuditEvent::CredentialApproved {
1061 query: &query,
1062 domain: domain.as_deref(),
1063 remote_identity: &source,
1064 request_id: &request_id,
1065 credential_id: reply.credential_id.as_deref(),
1066 fields,
1067 })
1068 .await;
1069
1070 notify!(
1071 notification_tx,
1072 UserClientNotification::CredentialApproved {
1073 domain,
1074 credential_id: reply.credential_id,
1075 }
1076 );
1077 } else {
1078 self.audit_log
1079 .write(AuditEvent::CredentialDenied {
1080 query: &query,
1081 domain: domain.as_deref(),
1082 remote_identity: &source,
1083 request_id: &request_id,
1084 credential_id: reply.credential_id.as_deref(),
1085 })
1086 .await;
1087
1088 notify!(
1089 notification_tx,
1090 UserClientNotification::CredentialDenied {
1091 domain,
1092 credential_id: reply.credential_id,
1093 }
1094 );
1095 }
1096
1097 Ok(())
1098 }
1099
1100 async fn complete_handshake(
1102 &self,
1103 remote_fingerprint: IdentityFingerprint,
1104 handshake_data: &str,
1105 ciphersuite_str: &str,
1106 psk: Option<&Psk>,
1107 ) -> Result<(MultiDeviceTransport, String), ClientError> {
1108 let ciphersuite = match ciphersuite_str {
1110 s if s.contains("Kyber768") => Ciphersuite::PQNNpsk2_Kyber768_XChaCha20Poly1305,
1111 _ => Ciphersuite::ClassicalNNpsk2_25519_XChaCha20Poly1035,
1112 };
1113
1114 let init_bytes = STANDARD
1116 .decode(handshake_data)
1117 .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
1118
1119 let init_packet = ap_noise::HandshakePacket::decode(&init_bytes)
1120 .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
1121
1122 let mut handshake = if let Some(psk) = psk {
1124 ResponderHandshake::with_psk(psk.clone())
1125 } else {
1126 ResponderHandshake::new()
1127 };
1128
1129 handshake.receive_start(&init_packet)?;
1131 let response_packet = handshake.send_finish()?;
1132 let (transport, fingerprint) = handshake.finalize()?;
1133
1134 let msg = ProtocolMessage::HandshakeResponse {
1136 data: STANDARD.encode(response_packet.encode()?),
1137 ciphersuite: format!("{ciphersuite:?}"),
1138 };
1139
1140 let msg_json = serde_json::to_string(&msg)?;
1141
1142 self.proxy_client
1143 .send_to(remote_fingerprint, msg_json.into_bytes())
1144 .await?;
1145
1146 debug!("Sent handshake response to {:?}", remote_fingerprint);
1147
1148 Ok((transport, fingerprint.to_string()))
1149 }
1150}