1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(target_arch = "wasm32")]
9use web_time::Instant;
10
11use ap_noise::{Ciphersuite, MultiDeviceTransport, Psk, ResponderHandshake};
12use ap_proxy_client::IncomingMessage;
13use ap_proxy_protocol::{IdentityFingerprint, IdentityKeyPair, RendezvousCode};
14use base64::{Engine, engine::general_purpose::STANDARD};
15use futures_util::StreamExt;
16use futures_util::stream::FuturesUnordered;
17use tokio::sync::oneshot;
18
19use crate::proxy::ProxyClient;
20use crate::types::{CredentialData, PskId, PskToken};
21use tokio::sync::mpsc;
22use tracing::{debug, warn};
23
24const RECONNECT_BASE_DELAY: Duration = Duration::from_secs(2);
26const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(15 * 60);
28const PENDING_PAIRING_MAX_AGE: Duration = Duration::from_secs(10 * 60);
30const AWAITING_VERIFICATION_BUFFER_LIMIT: usize = 100;
32
33struct PskPairing {
35 connection_name: Option<String>,
36 created_at: Instant,
37 psk: Psk,
38}
39
40struct RendezvousPairing {
42 connection_name: Option<String>,
43 created_at: Instant,
44 code_tx: Option<oneshot::Sender<Result<RendezvousCode, ClientError>>>,
46}
47
48struct PendingPairings {
54 psk_pairings: HashMap<PskId, PskPairing>,
56 rendezvous: Option<RendezvousPairing>,
58 buffered_messages: HashMap<IdentityFingerprint, Vec<IncomingMessage>>,
60}
61
62impl PendingPairings {
63 fn new() -> Self {
64 Self {
65 psk_pairings: HashMap::new(),
66 rendezvous: None,
67 buffered_messages: HashMap::new(),
68 }
69 }
70
71 fn prune_stale(&mut self) {
73 self.psk_pairings
74 .retain(|_, p| p.created_at.elapsed() < PENDING_PAIRING_MAX_AGE);
75 if self
76 .rendezvous
77 .as_ref()
78 .is_some_and(|r| r.created_at.elapsed() >= PENDING_PAIRING_MAX_AGE)
79 {
80 self.rendezvous = None;
81 }
82 }
83
84 fn take_rendezvous(&mut self) -> Option<RendezvousPairing> {
86 self.rendezvous.take()
87 }
88
89 fn prepare_buffering(&mut self, source: IdentityFingerprint) {
91 self.buffered_messages.insert(source, Vec::new());
92 }
93
94 fn try_buffer_message(&mut self, msg: IncomingMessage) -> Option<IncomingMessage> {
98 let source = match &msg {
99 IncomingMessage::Send { source, .. } => source,
100 _ => return Some(msg),
101 };
102 if let Some(buffer) = self.buffered_messages.get_mut(source) {
103 if buffer.len() < AWAITING_VERIFICATION_BUFFER_LIMIT {
104 debug!(
105 "Buffering message from {:?} pending fingerprint verification",
106 source
107 );
108 buffer.push(msg);
109 } else {
110 warn!("Buffer limit reached for {:?}, dropping message", source);
111 }
112 None
113 } else {
114 Some(msg)
115 }
116 }
117
118 fn take_buffered_messages(&mut self, source: &IdentityFingerprint) -> Vec<IncomingMessage> {
121 self.buffered_messages.remove(source).unwrap_or_default()
122 }
123}
124
125use super::notify;
126use crate::{
127 error::ClientError,
128 traits::{
129 AuditConnectionType, AuditEvent, AuditLog, ConnectionInfo, ConnectionStore,
130 CredentialFieldSet, IdentityProvider, NoOpAuditLog,
131 },
132 types::{CredentialRequestPayload, CredentialResponsePayload, ProtocolMessage},
133};
134
135#[derive(Debug, Clone)]
141pub enum UserClientNotification {
142 Listening {},
144 HandshakeStart {},
146 HandshakeProgress {
148 message: String,
150 },
151 HandshakeComplete {},
153 HandshakeFingerprint {
155 fingerprint: String,
157 identity: IdentityFingerprint,
159 },
160 FingerprintVerified {},
162 FingerprintRejected {
164 reason: String,
166 },
167 CredentialApproved {
169 domain: Option<String>,
171 credential_id: Option<String>,
173 },
174 CredentialDenied {
176 domain: Option<String>,
178 credential_id: Option<String>,
180 },
181 SessionRefreshed {
183 fingerprint: IdentityFingerprint,
185 },
186 ClientDisconnected {},
188 Reconnecting {
190 attempt: u32,
192 },
193 Reconnected {},
195 Error {
197 message: String,
199 context: Option<String>,
201 },
202}
203
204#[derive(Debug)]
206pub struct FingerprintVerificationReply {
207 pub approved: bool,
209 pub name: Option<String>,
211}
212
213#[derive(Debug)]
215pub struct CredentialRequestReply {
216 pub approved: bool,
218 pub credential: Option<CredentialData>,
220 pub credential_id: Option<String>,
222}
223
224#[derive(Debug)]
226pub enum UserClientRequest {
227 VerifyFingerprint {
229 fingerprint: String,
231 identity: IdentityFingerprint,
233 reply: oneshot::Sender<FingerprintVerificationReply>,
235 },
236 CredentialRequest {
238 query: crate::types::CredentialQuery,
240 identity: IdentityFingerprint,
242 reply: oneshot::Sender<CredentialRequestReply>,
244 },
245}
246
247enum PendingReply {
253 FingerprintVerification {
254 source: IdentityFingerprint,
255 transport: MultiDeviceTransport,
256 connection_name: Option<String>,
257 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
258 },
259 CredentialResponse {
260 source: IdentityFingerprint,
261 request_id: String,
262 query: crate::types::CredentialQuery,
263 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
264 },
265}
266
267type PendingReplyFuture = Pin<Box<dyn Future<Output = PendingReply> + Send>>;
269
270enum UserClientCommand {
276 GetPskToken {
278 name: Option<String>,
279 reply: oneshot::Sender<Result<String, ClientError>>,
280 },
281 GetRendezvousToken {
283 name: Option<String>,
284 reply: oneshot::Sender<Result<RendezvousCode, ClientError>>,
285 },
286}
287
288pub struct UserClientHandle {
298 pub client: UserClient,
299 pub notifications: mpsc::Receiver<UserClientNotification>,
300 pub requests: mpsc::Receiver<UserClientRequest>,
301}
302
303#[derive(Clone)]
304pub struct UserClient {
305 command_tx: mpsc::Sender<UserClientCommand>,
306}
307
308impl UserClient {
309 pub async fn connect(
318 identity_provider: Box<dyn IdentityProvider>,
319 connection_store: Box<dyn ConnectionStore>,
320 mut proxy_client: Box<dyn ProxyClient>,
321 audit_log: Option<Box<dyn AuditLog>>,
322 ) -> Result<UserClientHandle, ClientError> {
323 let identity_keypair = identity_provider.identity().await;
325 let own_fingerprint = identity_keypair.identity().fingerprint();
326
327 let incoming_rx = proxy_client.connect(identity_keypair.clone()).await?;
329
330 let (notification_tx, notification_rx) = mpsc::channel(32);
332 let (request_tx, request_rx) = mpsc::channel(32);
333
334 let (command_tx, command_rx) = mpsc::channel(32);
336
337 let inner = UserClientInner {
339 connection_store,
340 proxy_client,
341 identity_keypair,
342 own_fingerprint,
343 transports: HashMap::new(),
344 pending_pairings: PendingPairings::new(),
345 audit_log: audit_log.unwrap_or_else(|| Box::new(NoOpAuditLog)),
346 };
347
348 #[cfg(target_arch = "wasm32")]
350 wasm_bindgen_futures::spawn_local(inner.run_event_loop(
351 incoming_rx,
352 command_rx,
353 notification_tx,
354 request_tx,
355 ));
356 #[cfg(not(target_arch = "wasm32"))]
357 tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
358
359 Ok(UserClientHandle {
360 client: Self { command_tx },
361 notifications: notification_rx,
362 requests: request_rx,
363 })
364 }
365
366 pub async fn get_psk_token(&self, name: Option<String>) -> Result<String, ClientError> {
371 let (tx, rx) = oneshot::channel();
372 self.command_tx
373 .send(UserClientCommand::GetPskToken { name, reply: tx })
374 .await
375 .map_err(|_| ClientError::ChannelClosed)?;
376 rx.await.map_err(|_| ClientError::ChannelClosed)?
377 }
378
379 pub async fn get_rendezvous_token(
384 &self,
385 name: Option<String>,
386 ) -> Result<RendezvousCode, ClientError> {
387 let (tx, rx) = oneshot::channel();
388 self.command_tx
389 .send(UserClientCommand::GetRendezvousToken { name, reply: tx })
390 .await
391 .map_err(|_| ClientError::ChannelClosed)?;
392 rx.await.map_err(|_| ClientError::ChannelClosed)?
393 }
394}
395
396struct UserClientInner {
402 connection_store: Box<dyn ConnectionStore>,
403 proxy_client: Box<dyn ProxyClient>,
404 identity_keypair: IdentityKeyPair,
406 own_fingerprint: IdentityFingerprint,
408 transports: HashMap<IdentityFingerprint, MultiDeviceTransport>,
410 pending_pairings: PendingPairings,
412 audit_log: Box<dyn AuditLog>,
414}
415
416impl UserClientInner {
417 async fn run_event_loop(
419 mut self,
420 mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
421 mut command_rx: mpsc::Receiver<UserClientCommand>,
422 notification_tx: mpsc::Sender<UserClientNotification>,
423 request_tx: mpsc::Sender<UserClientRequest>,
424 ) {
425 notify!(notification_tx, UserClientNotification::Listening {});
427
428 let mut pending_replies: FuturesUnordered<PendingReplyFuture> = FuturesUnordered::new();
429
430 loop {
431 tokio::select! {
432 msg = incoming_rx.recv() => {
433 match msg {
434 Some(msg) => {
435 match self.handle_incoming(msg, ¬ification_tx, &request_tx).await {
436 Ok(Some(fut)) => pending_replies.push(fut),
437 Ok(None) => {}
438 Err(e) => {
439 warn!("Error handling incoming message: {}", e);
440 notify!(notification_tx, UserClientNotification::Error {
441 message: e.to_string(),
442 context: Some("handle_incoming".to_string()),
443 });
444 }
445 }
446 }
447 None => {
448 notify!(notification_tx, UserClientNotification::ClientDisconnected {});
450 match self.attempt_reconnection(¬ification_tx).await {
451 Ok(new_rx) => {
452 incoming_rx = new_rx;
453 notify!(notification_tx, UserClientNotification::Reconnected {});
454 }
455 Err(e) => {
456 warn!("Reconnection failed permanently: {}", e);
457 notify!(notification_tx, UserClientNotification::Error {
458 message: e.to_string(),
459 context: Some("reconnection".to_string()),
460 });
461 return;
462 }
463 }
464 }
465 }
466 }
467 Some(reply) = pending_replies.next() => {
468 match self.process_pending_reply(reply, ¬ification_tx, &request_tx).await {
469 Ok(futs) => {
470 for fut in futs {
471 pending_replies.push(fut);
472 }
473 }
474 Err(e) => {
475 warn!("Error processing pending reply: {}", e);
476 notify!(notification_tx, UserClientNotification::Error {
477 message: e.to_string(),
478 context: Some("process_pending_reply".to_string()),
479 });
480 }
481 }
482 }
483 cmd = command_rx.recv() => {
484 match cmd {
485 Some(cmd) => self.handle_command(cmd, ¬ification_tx).await,
486 None => {
487 debug!("All UserClient handles dropped, shutting down event loop");
489 return;
490 }
491 }
492 }
493 }
494 }
495 }
496
497 async fn attempt_reconnection(
499 &mut self,
500 notification_tx: &mpsc::Sender<UserClientNotification>,
501 ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ClientError> {
502 use rand::{Rng, SeedableRng};
503
504 let mut rng = rand::rngs::StdRng::from_entropy();
505 let mut attempt: u32 = 0;
506
507 loop {
508 attempt = attempt.saturating_add(1);
509
510 let _ = self.proxy_client.disconnect().await;
512
513 match self
514 .proxy_client
515 .connect(self.identity_keypair.clone())
516 .await
517 {
518 Ok(new_rx) => {
519 debug!("Reconnected to proxy on attempt {}", attempt);
520 return Ok(new_rx);
521 }
522 Err(e) => {
523 debug!("Reconnection attempt {} failed: {}", attempt, e);
524 notify!(
525 notification_tx,
526 UserClientNotification::Reconnecting { attempt }
527 );
528
529 let exp_delay = RECONNECT_BASE_DELAY
531 .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
532 let delay = exp_delay.min(RECONNECT_MAX_DELAY);
533 let jitter_max = (delay.as_millis() as u64) / 4;
534 let jitter = if jitter_max > 0 {
535 rng.gen_range(0..=jitter_max)
536 } else {
537 0
538 };
539 let total_delay = delay + Duration::from_millis(jitter);
540
541 crate::compat::sleep(total_delay).await;
542 }
543 }
544 }
545 }
546
547 async fn handle_incoming(
549 &mut self,
550 msg: IncomingMessage,
551 notification_tx: &mpsc::Sender<UserClientNotification>,
552 request_tx: &mpsc::Sender<UserClientRequest>,
553 ) -> Result<Option<PendingReplyFuture>, ClientError> {
554 let Some(msg) = self.pending_pairings.try_buffer_message(msg) else {
556 return Ok(None);
557 };
558
559 match msg {
560 IncomingMessage::Send {
561 source, payload, ..
562 } => {
563 let text = String::from_utf8(payload)
564 .map_err(|e| ClientError::Serialization(format!("Invalid UTF-8: {e}")))?;
565
566 let protocol_msg: ProtocolMessage = serde_json::from_str(&text)?;
567
568 match protocol_msg {
569 ProtocolMessage::HandshakeInit {
570 data,
571 ciphersuite,
572 psk_id,
573 } => {
574 self.handle_handshake_init(
575 source,
576 data,
577 ciphersuite,
578 psk_id,
579 notification_tx,
580 request_tx,
581 )
582 .await
583 }
584 ProtocolMessage::CredentialRequest { encrypted } => {
585 self.handle_credential_request(
586 source,
587 encrypted,
588 notification_tx,
589 request_tx,
590 )
591 .await
592 }
593 _ => {
594 debug!("Received unexpected message type from {:?}", source);
595 Ok(None)
596 }
597 }
598 }
599 IncomingMessage::RendezvousInfo(code) => {
600 if let Some(pairing) = &mut self.pending_pairings.rendezvous {
601 if let Some(sender) = pairing.code_tx.take() {
602 debug!("Completed rendezvous pairing via handle, code: {}", code);
603 let _ = sender.send(Ok(code));
604 }
605 } else {
606 debug!("Received RendezvousInfo but no pending rendezvous pairing found");
607 }
608 Ok(None)
609 }
610 IncomingMessage::IdentityInfo { .. } => {
611 debug!("Received unexpected IdentityInfo message");
613 Ok(None)
614 }
615 }
616 }
617
618 async fn handle_handshake_init(
620 &mut self,
621 source: IdentityFingerprint,
622 data: String,
623 ciphersuite: String,
624 psk_id: Option<PskId>,
625 notification_tx: &mpsc::Sender<UserClientNotification>,
626 request_tx: &mpsc::Sender<UserClientRequest>,
627 ) -> Result<Option<PendingReplyFuture>, ClientError> {
628 debug!("Received handshake init from source: {:?}", source);
629 notify!(notification_tx, UserClientNotification::HandshakeStart {});
630
631 let is_new_connection = self.connection_store.get(&source).await.is_none();
633
634 let (psk_for_handshake, matched_pairing_name, is_psk_connection) = if !is_new_connection {
636 (None, None, false)
638 } else {
639 self.pending_pairings.prune_stale();
641
642 match &psk_id {
643 Some(id) => {
644 if let Some(pairing) = self.pending_pairings.psk_pairings.remove(id) {
646 (Some(pairing.psk), pairing.connection_name, true)
647 } else {
648 warn!("No matching PSK pairing for psk_id: {}", id);
649 return Err(ClientError::InvalidState {
650 expected: "matching PSK pairing".to_string(),
651 current: format!("no pairing for psk_id {id}"),
652 });
653 }
654 }
655 None => {
656 if let Some(pairing) = self.pending_pairings.take_rendezvous() {
658 (None, pairing.connection_name, false)
659 } else {
660 return Err(ClientError::InvalidState {
661 expected: "pending rendezvous pairing".to_string(),
662 current: "no pending rendezvous pairing".to_string(),
663 });
664 }
665 }
666 }
667 };
668
669 let (transport, fingerprint_str) = self
670 .complete_handshake(source, &data, &ciphersuite, psk_for_handshake.as_ref())
671 .await?;
672
673 notify!(
674 notification_tx,
675 UserClientNotification::HandshakeComplete {}
676 );
677
678 if is_new_connection && !is_psk_connection {
679 self.pending_pairings.prepare_buffering(source);
682
683 let (tx, rx) = oneshot::channel();
684
685 if request_tx.capacity() == 0 {
686 warn!("Request channel full, waiting for consumer to drain");
687 }
688 request_tx
689 .send(UserClientRequest::VerifyFingerprint {
690 fingerprint: fingerprint_str,
691 identity: source,
692 reply: tx,
693 })
694 .await
695 .ok();
696
697 let fut: PendingReplyFuture = Box::pin(async move {
698 let result = rx.await;
699 PendingReply::FingerprintVerification {
700 source,
701 transport,
702 connection_name: matched_pairing_name,
703 reply: result,
704 }
705 });
706
707 Ok(Some(fut))
708 } else if !is_new_connection {
709 let existing = self.connection_store.get(&source).await;
712 let now = crate::compat::now_seconds();
713 self.transports.insert(source, transport.clone());
714 self.connection_store
715 .save(ConnectionInfo {
716 fingerprint: source,
717 name: existing.as_ref().and_then(|s| s.name.clone()),
718 cached_at: existing.as_ref().map_or(now, |s| s.cached_at),
719 last_connected_at: now,
720 transport_state: Some(transport),
721 })
722 .await?;
723
724 self.audit_log
725 .write(AuditEvent::SessionRefreshed {
726 remote_identity: &source,
727 })
728 .await;
729
730 notify!(
731 notification_tx,
732 UserClientNotification::SessionRefreshed {
733 fingerprint: source,
734 }
735 );
736
737 Ok(None)
738 } else {
739 self.accept_new_connection(
741 source,
742 transport,
743 matched_pairing_name.as_deref(),
744 AuditConnectionType::Psk,
745 )
746 .await?;
747
748 notify!(
750 notification_tx,
751 UserClientNotification::HandshakeFingerprint {
752 fingerprint: fingerprint_str,
753 identity: source,
754 }
755 );
756
757 Ok(None)
758 }
759 }
760
761 async fn accept_new_connection(
763 &mut self,
764 fingerprint: IdentityFingerprint,
765 transport: MultiDeviceTransport,
766 connection_name: Option<&str>,
767 connection_type: AuditConnectionType,
768 ) -> Result<(), ClientError> {
769 let now = crate::compat::now_seconds();
770 self.transports.insert(fingerprint, transport.clone());
771 self.connection_store
772 .save(ConnectionInfo {
773 fingerprint,
774 name: connection_name.map(|s| s.to_owned()),
775 cached_at: now,
776 last_connected_at: now,
777 transport_state: Some(transport),
778 })
779 .await?;
780
781 self.audit_log
782 .write(AuditEvent::ConnectionEstablished {
783 remote_identity: &fingerprint,
784 remote_name: connection_name,
785 connection_type,
786 })
787 .await;
788
789 Ok(())
790 }
791
792 async fn handle_credential_request(
794 &mut self,
795 source: IdentityFingerprint,
796 encrypted: String,
797 notification_tx: &mpsc::Sender<UserClientNotification>,
798 request_tx: &mpsc::Sender<UserClientRequest>,
799 ) -> Result<Option<PendingReplyFuture>, ClientError> {
800 if !self.transports.contains_key(&source) {
801 debug!("Loading transport state for source: {:?}", source);
802 let connection = self.connection_store.get(&source).await.ok_or_else(|| {
803 ClientError::ConnectionCache(format!("Missing cached connection {source:?}"))
804 })?;
805 let transport = connection.transport_state.ok_or_else(|| {
806 ClientError::ConnectionCache(format!(
807 "Missing transport state for cached connection {source:?}"
808 ))
809 })?;
810 self.transports.insert(source, transport);
811 }
812
813 let transport = self
815 .transports
816 .get_mut(&source)
817 .ok_or(ClientError::SecureChannelNotEstablished)?;
818
819 let encrypted_bytes = STANDARD
821 .decode(&encrypted)
822 .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
823
824 let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
825 .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
826
827 let decrypted = transport
828 .decrypt(&packet)
829 .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
830
831 let request: CredentialRequestPayload = serde_json::from_slice(&decrypted)?;
832
833 self.audit_log
834 .write(AuditEvent::CredentialRequested {
835 query: &request.query,
836 remote_identity: &source,
837 request_id: &request.request_id,
838 })
839 .await;
840
841 let (tx, rx) = oneshot::channel();
843
844 if request_tx.capacity() == 0 {
846 warn!("Request channel full, waiting for consumer to drain");
847 }
848 if request_tx
849 .send(UserClientRequest::CredentialRequest {
850 query: request.query.clone(),
851 identity: source,
852 reply: tx,
853 })
854 .await
855 .is_err()
856 {
857 warn!("Request channel closed, cannot send credential request");
859 notify!(
860 notification_tx,
861 UserClientNotification::Error {
862 message: "Request channel closed".to_string(),
863 context: Some("handle_credential_request".to_string()),
864 }
865 );
866 return Ok(None);
867 }
868
869 let request_id = request.request_id;
871 let query = request.query;
872 let fut: PendingReplyFuture = Box::pin(async move {
873 let result = rx.await;
874 PendingReply::CredentialResponse {
875 source,
876 request_id,
877 query,
878 reply: result,
879 }
880 });
881
882 Ok(Some(fut))
883 }
884
885 async fn process_pending_reply(
887 &mut self,
888 reply: PendingReply,
889 notification_tx: &mpsc::Sender<UserClientNotification>,
890 request_tx: &mpsc::Sender<UserClientRequest>,
891 ) -> Result<Vec<PendingReplyFuture>, ClientError> {
892 match reply {
893 PendingReply::FingerprintVerification {
894 source,
895 transport,
896 connection_name,
897 reply,
898 } => {
899 self.process_fingerprint_reply(
900 source,
901 transport,
902 connection_name,
903 reply,
904 notification_tx,
905 request_tx,
906 )
907 .await
908 }
909 PendingReply::CredentialResponse {
910 source,
911 request_id,
912 query,
913 reply,
914 } => {
915 self.process_credential_reply(source, request_id, query, reply, notification_tx)
916 .await?;
917 Ok(Vec::new())
918 }
919 }
920 }
921
922 async fn handle_command(
924 &mut self,
925 cmd: UserClientCommand,
926 notification_tx: &mpsc::Sender<UserClientNotification>,
927 ) {
928 match cmd {
929 UserClientCommand::GetPskToken { name, reply } => {
930 let result = self.generate_psk_token(name).await;
931 let _ = reply.send(result);
932 }
933 UserClientCommand::GetRendezvousToken { name, reply } => {
934 if let Err(e) = self.proxy_client.request_rendezvous().await {
935 let _ = reply.send(Err(e));
936 return;
937 }
938
939 self.pending_pairings.prune_stale();
941
942 self.pending_pairings.rendezvous = Some(RendezvousPairing {
945 connection_name: name,
946 created_at: Instant::now(),
947 code_tx: Some(reply),
948 });
949
950 notify!(
952 notification_tx,
953 UserClientNotification::HandshakeProgress {
954 message: "Requesting rendezvous code from proxy...".to_string(),
955 }
956 );
957 }
958 }
959 }
960
961 async fn generate_psk_token(&mut self, name: Option<String>) -> Result<String, ClientError> {
963 let psk = Psk::generate();
964 let psk_id = psk.id();
965 let token = PskToken::new(psk.clone(), self.own_fingerprint).to_string();
966
967 self.pending_pairings.prune_stale();
968 self.pending_pairings.psk_pairings.insert(
969 psk_id,
970 PskPairing {
971 connection_name: name,
972 created_at: Instant::now(),
973 psk,
974 },
975 );
976 debug!("Created PSK pairing, token generated");
977
978 Ok(token)
979 }
980
981 async fn process_fingerprint_reply(
983 &mut self,
984 source: IdentityFingerprint,
985 transport: MultiDeviceTransport,
986 connection_name: Option<String>,
987 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
988 notification_tx: &mpsc::Sender<UserClientNotification>,
989 request_tx: &mpsc::Sender<UserClientRequest>,
990 ) -> Result<Vec<PendingReplyFuture>, ClientError> {
991 match reply {
992 Ok(FingerprintVerificationReply {
993 approved: true,
994 name,
995 }) => {
996 let conn_name = name.or(connection_name);
998 self.accept_new_connection(
999 source,
1000 transport,
1001 conn_name.as_deref(),
1002 AuditConnectionType::Rendezvous,
1003 )
1004 .await?;
1005
1006 notify!(
1007 notification_tx,
1008 UserClientNotification::FingerprintVerified {}
1009 );
1010
1011 let mut futures = Vec::new();
1013 for msg in self.pending_pairings.take_buffered_messages(&source) {
1014 match self.handle_incoming(msg, notification_tx, request_tx).await {
1015 Ok(Some(fut)) => futures.push(fut),
1016 Ok(None) => {}
1017 Err(e) => {
1018 warn!("Error processing buffered message: {}", e);
1019 }
1020 }
1021 }
1022
1023 Ok(futures)
1024 }
1025 Ok(FingerprintVerificationReply {
1026 approved: false, ..
1027 }) => {
1028 self.reject_fingerprint(
1029 &source,
1030 "User rejected fingerprint verification",
1031 notification_tx,
1032 )
1033 .await;
1034 Ok(Vec::new())
1035 }
1036 Err(_) => {
1037 warn!("Fingerprint verification reply channel dropped, treating as rejection");
1038 self.reject_fingerprint(
1039 &source,
1040 "Verification cancelled (reply dropped)",
1041 notification_tx,
1042 )
1043 .await;
1044 Ok(Vec::new())
1045 }
1046 }
1047 }
1048
1049 async fn reject_fingerprint(
1051 &mut self,
1052 source: &IdentityFingerprint,
1053 reason: &str,
1054 notification_tx: &mpsc::Sender<UserClientNotification>,
1055 ) {
1056 self.pending_pairings.take_buffered_messages(source);
1057
1058 self.audit_log
1059 .write(AuditEvent::ConnectionRejected {
1060 remote_identity: source,
1061 })
1062 .await;
1063
1064 notify!(
1065 notification_tx,
1066 UserClientNotification::FingerprintRejected {
1067 reason: reason.to_string(),
1068 }
1069 );
1070 }
1071
1072 #[allow(clippy::too_many_arguments)]
1074 async fn process_credential_reply(
1075 &mut self,
1076 source: IdentityFingerprint,
1077 request_id: String,
1078 query: crate::types::CredentialQuery,
1079 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
1080 notification_tx: &mpsc::Sender<UserClientNotification>,
1081 ) -> Result<(), ClientError> {
1082 let reply = match reply {
1083 Ok(r) => r,
1084 Err(_) => {
1085 warn!("Credential reply channel dropped, treating as denial");
1087 CredentialRequestReply {
1088 approved: false,
1089 credential: None,
1090 credential_id: None,
1091 }
1092 }
1093 };
1094
1095 let transport = self
1096 .transports
1097 .get_mut(&source)
1098 .ok_or(ClientError::SecureChannelNotEstablished)?;
1099
1100 let domain = reply.credential.as_ref().and_then(|c| c.domain.clone());
1102 let fields = reply
1103 .credential
1104 .as_ref()
1105 .map_or_else(CredentialFieldSet::default, |c| CredentialFieldSet {
1106 has_username: c.username.is_some(),
1107 has_password: c.password.is_some(),
1108 has_totp: c.totp.is_some(),
1109 has_uri: c.uri.is_some(),
1110 has_notes: c.notes.is_some(),
1111 });
1112
1113 let response_payload = CredentialResponsePayload {
1115 credential: if reply.approved {
1116 reply.credential
1117 } else {
1118 None
1119 },
1120 error: if !reply.approved {
1121 Some("Request denied".to_string())
1122 } else {
1123 None
1124 },
1125 request_id: Some(request_id.clone()),
1126 };
1127
1128 let response_json = serde_json::to_string(&response_payload)?;
1130 let encrypted = transport
1131 .encrypt(response_json.as_bytes())
1132 .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
1133
1134 let msg = ProtocolMessage::CredentialResponse {
1135 encrypted: STANDARD.encode(encrypted.encode()),
1136 };
1137
1138 let msg_json = serde_json::to_string(&msg)?;
1139
1140 self.proxy_client
1141 .send_to(source, msg_json.into_bytes())
1142 .await?;
1143
1144 if reply.approved {
1146 self.audit_log
1147 .write(AuditEvent::CredentialApproved {
1148 query: &query,
1149 domain: domain.as_deref(),
1150 remote_identity: &source,
1151 request_id: &request_id,
1152 credential_id: reply.credential_id.as_deref(),
1153 fields,
1154 })
1155 .await;
1156
1157 notify!(
1158 notification_tx,
1159 UserClientNotification::CredentialApproved {
1160 domain,
1161 credential_id: reply.credential_id,
1162 }
1163 );
1164 } else {
1165 self.audit_log
1166 .write(AuditEvent::CredentialDenied {
1167 query: &query,
1168 domain: domain.as_deref(),
1169 remote_identity: &source,
1170 request_id: &request_id,
1171 credential_id: reply.credential_id.as_deref(),
1172 })
1173 .await;
1174
1175 notify!(
1176 notification_tx,
1177 UserClientNotification::CredentialDenied {
1178 domain,
1179 credential_id: reply.credential_id,
1180 }
1181 );
1182 }
1183
1184 Ok(())
1185 }
1186
1187 async fn complete_handshake(
1189 &self,
1190 remote_fingerprint: IdentityFingerprint,
1191 handshake_data: &str,
1192 ciphersuite_str: &str,
1193 psk: Option<&Psk>,
1194 ) -> Result<(MultiDeviceTransport, String), ClientError> {
1195 let ciphersuite = match ciphersuite_str {
1197 s if s.contains("Kyber768") => Ciphersuite::PQNNpsk2_Kyber768_XChaCha20Poly1305,
1198 _ => Ciphersuite::ClassicalNNpsk2_25519_XChaCha20Poly1035,
1199 };
1200
1201 let init_bytes = STANDARD
1203 .decode(handshake_data)
1204 .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
1205
1206 let init_packet = ap_noise::HandshakePacket::decode(&init_bytes)
1207 .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
1208
1209 let mut handshake = if let Some(psk) = psk {
1211 ResponderHandshake::with_psk(psk.clone())
1212 } else {
1213 ResponderHandshake::new()
1214 };
1215
1216 handshake.receive_start(&init_packet)?;
1218 let response_packet = handshake.send_finish()?;
1219 let (transport, fingerprint) = handshake.finalize()?;
1220
1221 let msg = ProtocolMessage::HandshakeResponse {
1223 data: STANDARD.encode(response_packet.encode()?),
1224 ciphersuite: format!("{ciphersuite:?}"),
1225 };
1226
1227 let msg_json = serde_json::to_string(&msg)?;
1228
1229 self.proxy_client
1230 .send_to(remote_fingerprint, msg_json.into_bytes())
1231 .await?;
1232
1233 debug!("Sent handshake response to {:?}", remote_fingerprint);
1234
1235 Ok((transport, fingerprint.to_string()))
1236 }
1237}