1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(target_arch = "wasm32")]
9use web_time::Instant;
10
11use ap_noise::{Ciphersuite, MultiDeviceTransport, Psk, ResponderHandshake};
12use ap_proxy_client::IncomingMessage;
13use ap_proxy_protocol::{IdentityFingerprint, IdentityKeyPair, RendezvousCode};
14use base64::{Engine, engine::general_purpose::STANDARD};
15use futures_util::StreamExt;
16use futures_util::stream::FuturesUnordered;
17use tokio::sync::oneshot;
18
19use crate::proxy::ProxyClient;
20use crate::types::{CredentialData, PskId, PskToken};
21use tokio::sync::mpsc;
22use tracing::{debug, warn};
23
24const RECONNECT_BASE_DELAY: Duration = Duration::from_secs(2);
26const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(15 * 60);
28const PENDING_PAIRING_MAX_AGE: Duration = Duration::from_secs(10 * 60);
30const AWAITING_VERIFICATION_BUFFER_LIMIT: usize = 100;
32
33struct PskPairing {
35 connection_name: Option<String>,
36 created_at: Instant,
37 psk: Psk,
38 reusable: bool,
40}
41
42struct RendezvousPairing {
44 connection_name: Option<String>,
45 created_at: Instant,
46 code_tx: Option<oneshot::Sender<Result<RendezvousCode, ClientError>>>,
48}
49
50struct PendingPairings {
56 psk_pairings: HashMap<PskId, PskPairing>,
58 rendezvous: Option<RendezvousPairing>,
60 buffered_messages: HashMap<IdentityFingerprint, Vec<IncomingMessage>>,
62}
63
64impl PendingPairings {
65 fn new() -> Self {
66 Self {
67 psk_pairings: HashMap::new(),
68 rendezvous: None,
69 buffered_messages: HashMap::new(),
70 }
71 }
72
73 fn prune_stale(&mut self) {
76 self.psk_pairings
77 .retain(|_, p| p.reusable || p.created_at.elapsed() < PENDING_PAIRING_MAX_AGE);
78 if self
79 .rendezvous
80 .as_ref()
81 .is_some_and(|r| r.created_at.elapsed() >= PENDING_PAIRING_MAX_AGE)
82 {
83 self.rendezvous = None;
84 }
85 }
86
87 fn take_rendezvous(&mut self) -> Option<RendezvousPairing> {
89 self.rendezvous.take()
90 }
91
92 fn take_psk(&mut self, id: &PskId) -> Option<(Psk, Option<String>)> {
96 let pairing = self.psk_pairings.get(id)?;
97 let psk = pairing.psk.clone();
98 let name = pairing.connection_name.clone();
99 if !pairing.reusable {
100 self.psk_pairings.remove(id);
101 }
102 Some((psk, name))
103 }
104
105 fn prepare_buffering(&mut self, source: IdentityFingerprint) {
107 self.buffered_messages.insert(source, Vec::new());
108 }
109
110 fn try_buffer_message(&mut self, msg: IncomingMessage) -> Option<IncomingMessage> {
114 let source = match &msg {
115 IncomingMessage::Send { source, .. } => source,
116 _ => return Some(msg),
117 };
118 if let Some(buffer) = self.buffered_messages.get_mut(source) {
119 if buffer.len() < AWAITING_VERIFICATION_BUFFER_LIMIT {
120 debug!(
121 "Buffering message from {:?} pending fingerprint verification",
122 source
123 );
124 buffer.push(msg);
125 } else {
126 warn!("Buffer limit reached for {:?}, dropping message", source);
127 }
128 None
129 } else {
130 Some(msg)
131 }
132 }
133
134 fn take_buffered_messages(&mut self, source: &IdentityFingerprint) -> Vec<IncomingMessage> {
137 self.buffered_messages.remove(source).unwrap_or_default()
138 }
139}
140
141use super::notify;
142use crate::{
143 error::ClientError,
144 traits::{
145 AuditConnectionType, AuditEvent, AuditLog, ConnectionInfo, ConnectionStore,
146 CredentialFieldSet, IdentityProvider, NoOpAuditLog, PskEntry, PskStore,
147 },
148 types::{CredentialRequestPayload, CredentialResponsePayload, ProtocolMessage},
149};
150
151#[derive(Debug, Clone)]
157pub enum UserClientNotification {
158 Listening {},
160 HandshakeStart {},
162 HandshakeProgress {
164 message: String,
166 },
167 HandshakeComplete {},
169 HandshakeFingerprint {
171 fingerprint: String,
173 identity: IdentityFingerprint,
175 },
176 FingerprintVerified {},
178 FingerprintRejected {
180 reason: String,
182 },
183 CredentialApproved {
185 domain: Option<String>,
187 credential_id: Option<String>,
189 },
190 CredentialDenied {
192 domain: Option<String>,
194 credential_id: Option<String>,
196 },
197 SessionRefreshed {
199 fingerprint: IdentityFingerprint,
201 },
202 ClientDisconnected {},
204 Reconnecting {
206 attempt: u32,
208 },
209 Reconnected {},
211 Error {
213 message: String,
215 context: Option<String>,
217 },
218}
219
220#[derive(Debug)]
222pub struct FingerprintVerificationReply {
223 pub approved: bool,
225 pub name: Option<String>,
227}
228
229#[derive(Debug)]
231pub struct CredentialRequestReply {
232 pub approved: bool,
234 pub credential: Option<CredentialData>,
236 pub credential_id: Option<String>,
238}
239
240#[derive(Debug)]
242pub enum UserClientRequest {
243 VerifyFingerprint {
245 fingerprint: String,
247 identity: IdentityFingerprint,
249 reply: oneshot::Sender<FingerprintVerificationReply>,
251 },
252 CredentialRequest {
254 query: crate::types::CredentialQuery,
256 identity: IdentityFingerprint,
258 reply: oneshot::Sender<CredentialRequestReply>,
260 },
261}
262
263enum PendingReply {
269 FingerprintVerification {
270 source: IdentityFingerprint,
271 transport: MultiDeviceTransport,
272 connection_name: Option<String>,
273 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
274 },
275 CredentialResponse {
276 source: IdentityFingerprint,
277 request_id: String,
278 query: crate::types::CredentialQuery,
279 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
280 },
281}
282
283type PendingReplyFuture = Pin<Box<dyn Future<Output = PendingReply> + Send>>;
285
286enum UserClientCommand {
292 GetPskToken {
294 name: Option<String>,
295 reusable: bool,
296 reply: oneshot::Sender<Result<String, ClientError>>,
297 },
298 GetRendezvousToken {
300 name: Option<String>,
301 reply: oneshot::Sender<Result<RendezvousCode, ClientError>>,
302 },
303}
304
305pub struct UserClientHandle {
315 pub client: UserClient,
316 pub notifications: mpsc::Receiver<UserClientNotification>,
317 pub requests: mpsc::Receiver<UserClientRequest>,
318}
319
320#[derive(Clone)]
321pub struct UserClient {
322 command_tx: mpsc::Sender<UserClientCommand>,
323}
324
325impl UserClient {
326 pub async fn connect(
337 identity_provider: Box<dyn IdentityProvider>,
338 connection_store: Box<dyn ConnectionStore>,
339 mut proxy_client: Box<dyn ProxyClient>,
340 audit_log: Option<Box<dyn AuditLog>>,
341 psk_store: Option<Box<dyn PskStore>>,
342 ) -> Result<UserClientHandle, ClientError> {
343 let identity_keypair = identity_provider.identity().await;
345 let own_fingerprint = identity_keypair.identity().fingerprint();
346
347 let incoming_rx = proxy_client.connect(identity_keypair.clone()).await?;
349
350 let (notification_tx, notification_rx) = mpsc::channel(32);
352 let (request_tx, request_rx) = mpsc::channel(32);
353
354 let (command_tx, command_rx) = mpsc::channel(32);
356
357 let mut pending_pairings = PendingPairings::new();
359
360 if let Some(ref store) = psk_store {
362 for entry in store.list().await {
363 pending_pairings.psk_pairings.insert(
364 entry.psk_id.clone(),
365 PskPairing {
366 connection_name: entry.name.clone(),
367 created_at: Instant::now(),
368 psk: entry.psk.clone(),
369 reusable: true,
370 },
371 );
372 debug!("Loaded reusable PSK from store: psk_id={}", entry.psk_id);
373 }
374 }
375
376 let inner = UserClientInner {
377 connection_store,
378 proxy_client,
379 identity_keypair,
380 own_fingerprint,
381 transports: HashMap::new(),
382 pending_pairings,
383 audit_log: audit_log.unwrap_or_else(|| Box::new(NoOpAuditLog)),
384 psk_store,
385 };
386
387 #[cfg(target_arch = "wasm32")]
389 wasm_bindgen_futures::spawn_local(inner.run_event_loop(
390 incoming_rx,
391 command_rx,
392 notification_tx,
393 request_tx,
394 ));
395 #[cfg(not(target_arch = "wasm32"))]
396 tokio::spawn(inner.run_event_loop(incoming_rx, command_rx, notification_tx, request_tx));
397
398 Ok(UserClientHandle {
399 client: Self { command_tx },
400 notifications: notification_rx,
401 requests: request_rx,
402 })
403 }
404
405 pub async fn get_psk_token(
414 &self,
415 name: Option<String>,
416 reusable: bool,
417 ) -> Result<String, ClientError> {
418 let (tx, rx) = oneshot::channel();
419 self.command_tx
420 .send(UserClientCommand::GetPskToken {
421 name,
422 reusable,
423 reply: tx,
424 })
425 .await
426 .map_err(|_| ClientError::ChannelClosed)?;
427 rx.await.map_err(|_| ClientError::ChannelClosed)?
428 }
429
430 pub async fn get_rendezvous_token(
435 &self,
436 name: Option<String>,
437 ) -> Result<RendezvousCode, ClientError> {
438 let (tx, rx) = oneshot::channel();
439 self.command_tx
440 .send(UserClientCommand::GetRendezvousToken { name, reply: tx })
441 .await
442 .map_err(|_| ClientError::ChannelClosed)?;
443 rx.await.map_err(|_| ClientError::ChannelClosed)?
444 }
445}
446
447struct UserClientInner {
453 connection_store: Box<dyn ConnectionStore>,
454 proxy_client: Box<dyn ProxyClient>,
455 identity_keypair: IdentityKeyPair,
457 own_fingerprint: IdentityFingerprint,
459 transports: HashMap<IdentityFingerprint, MultiDeviceTransport>,
461 pending_pairings: PendingPairings,
463 audit_log: Box<dyn AuditLog>,
465 psk_store: Option<Box<dyn PskStore>>,
467}
468
469impl UserClientInner {
470 async fn run_event_loop(
472 mut self,
473 mut incoming_rx: mpsc::UnboundedReceiver<IncomingMessage>,
474 mut command_rx: mpsc::Receiver<UserClientCommand>,
475 notification_tx: mpsc::Sender<UserClientNotification>,
476 request_tx: mpsc::Sender<UserClientRequest>,
477 ) {
478 notify!(notification_tx, UserClientNotification::Listening {});
480
481 let mut pending_replies: FuturesUnordered<PendingReplyFuture> = FuturesUnordered::new();
482
483 loop {
484 tokio::select! {
485 msg = incoming_rx.recv() => {
486 match msg {
487 Some(msg) => {
488 match self.handle_incoming(msg, ¬ification_tx, &request_tx).await {
489 Ok(Some(fut)) => pending_replies.push(fut),
490 Ok(None) => {}
491 Err(e) => {
492 warn!("Error handling incoming message: {}", e);
493 notify!(notification_tx, UserClientNotification::Error {
494 message: e.to_string(),
495 context: Some("handle_incoming".to_string()),
496 });
497 }
498 }
499 }
500 None => {
501 notify!(notification_tx, UserClientNotification::ClientDisconnected {});
503 match self.attempt_reconnection(¬ification_tx).await {
504 Ok(new_rx) => {
505 incoming_rx = new_rx;
506 notify!(notification_tx, UserClientNotification::Reconnected {});
507 }
508 Err(e) => {
509 warn!("Reconnection failed permanently: {}", e);
510 notify!(notification_tx, UserClientNotification::Error {
511 message: e.to_string(),
512 context: Some("reconnection".to_string()),
513 });
514 return;
515 }
516 }
517 }
518 }
519 }
520 Some(reply) = pending_replies.next() => {
521 match self.process_pending_reply(reply, ¬ification_tx, &request_tx).await {
522 Ok(futs) => {
523 for fut in futs {
524 pending_replies.push(fut);
525 }
526 }
527 Err(e) => {
528 warn!("Error processing pending reply: {}", e);
529 notify!(notification_tx, UserClientNotification::Error {
530 message: e.to_string(),
531 context: Some("process_pending_reply".to_string()),
532 });
533 }
534 }
535 }
536 cmd = command_rx.recv() => {
537 match cmd {
538 Some(cmd) => self.handle_command(cmd, ¬ification_tx).await,
539 None => {
540 debug!("All UserClient handles dropped, shutting down event loop");
542 return;
543 }
544 }
545 }
546 }
547 }
548 }
549
550 async fn attempt_reconnection(
552 &mut self,
553 notification_tx: &mpsc::Sender<UserClientNotification>,
554 ) -> Result<mpsc::UnboundedReceiver<IncomingMessage>, ClientError> {
555 use rand::{Rng, SeedableRng};
556
557 let mut rng = rand::rngs::StdRng::from_entropy();
558 let mut attempt: u32 = 0;
559
560 loop {
561 attempt = attempt.saturating_add(1);
562
563 let _ = self.proxy_client.disconnect().await;
565
566 match self
567 .proxy_client
568 .connect(self.identity_keypair.clone())
569 .await
570 {
571 Ok(new_rx) => {
572 debug!("Reconnected to proxy on attempt {}", attempt);
573 return Ok(new_rx);
574 }
575 Err(e) => {
576 debug!("Reconnection attempt {} failed: {}", attempt, e);
577 notify!(
578 notification_tx,
579 UserClientNotification::Reconnecting { attempt }
580 );
581
582 let exp_delay = RECONNECT_BASE_DELAY
584 .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
585 let delay = exp_delay.min(RECONNECT_MAX_DELAY);
586 let jitter_max = (delay.as_millis() as u64) / 4;
587 let jitter = if jitter_max > 0 {
588 rng.gen_range(0..=jitter_max)
589 } else {
590 0
591 };
592 let total_delay = delay + Duration::from_millis(jitter);
593
594 crate::compat::sleep(total_delay).await;
595 }
596 }
597 }
598 }
599
600 async fn handle_incoming(
602 &mut self,
603 msg: IncomingMessage,
604 notification_tx: &mpsc::Sender<UserClientNotification>,
605 request_tx: &mpsc::Sender<UserClientRequest>,
606 ) -> Result<Option<PendingReplyFuture>, ClientError> {
607 let Some(msg) = self.pending_pairings.try_buffer_message(msg) else {
609 return Ok(None);
610 };
611
612 match msg {
613 IncomingMessage::Send {
614 source, payload, ..
615 } => {
616 let text = String::from_utf8(payload)
617 .map_err(|e| ClientError::Serialization(format!("Invalid UTF-8: {e}")))?;
618
619 let protocol_msg: ProtocolMessage = serde_json::from_str(&text)?;
620
621 match protocol_msg {
622 ProtocolMessage::HandshakeInit {
623 data,
624 ciphersuite,
625 psk_id,
626 } => {
627 self.handle_handshake_init(
628 source,
629 data,
630 ciphersuite,
631 psk_id,
632 notification_tx,
633 request_tx,
634 )
635 .await
636 }
637 ProtocolMessage::CredentialRequest { encrypted } => {
638 self.handle_credential_request(
639 source,
640 encrypted,
641 notification_tx,
642 request_tx,
643 )
644 .await
645 }
646 _ => {
647 debug!("Received unexpected message type from {:?}", source);
648 Ok(None)
649 }
650 }
651 }
652 IncomingMessage::RendezvousInfo(code) => {
653 if let Some(pairing) = &mut self.pending_pairings.rendezvous {
654 if let Some(sender) = pairing.code_tx.take() {
655 debug!("Completed rendezvous pairing via handle, code: {}", code);
656 let _ = sender.send(Ok(code));
657 }
658 } else {
659 debug!("Received RendezvousInfo but no pending rendezvous pairing found");
660 }
661 Ok(None)
662 }
663 IncomingMessage::IdentityInfo { .. } => {
664 debug!("Received unexpected IdentityInfo message");
666 Ok(None)
667 }
668 }
669 }
670
671 async fn handle_handshake_init(
673 &mut self,
674 source: IdentityFingerprint,
675 data: String,
676 ciphersuite: String,
677 psk_id: Option<PskId>,
678 notification_tx: &mpsc::Sender<UserClientNotification>,
679 request_tx: &mpsc::Sender<UserClientRequest>,
680 ) -> Result<Option<PendingReplyFuture>, ClientError> {
681 debug!("Received handshake init from source: {:?}", source);
682 notify!(notification_tx, UserClientNotification::HandshakeStart {});
683
684 let is_new_connection = self.connection_store.get(&source).await.is_none();
686
687 let (psk_for_handshake, matched_pairing_name, is_psk_connection) = if !is_new_connection {
689 match psk_id
693 .as_ref()
694 .and_then(|id| self.pending_pairings.take_psk(id))
695 {
696 Some((psk, name)) => (Some(psk), name, true),
697 None => (None, None, false),
698 }
699 } else {
700 self.pending_pairings.prune_stale();
702
703 match &psk_id {
704 Some(id) => {
705 if let Some((psk, name)) = self.pending_pairings.take_psk(id) {
707 (Some(psk), name, true)
708 } else {
709 warn!("No matching PSK pairing for psk_id: {}", id);
710 return Err(ClientError::InvalidState {
711 expected: "matching PSK pairing".to_string(),
712 current: format!("no pairing for psk_id {id}"),
713 });
714 }
715 }
716 None => {
717 if let Some(pairing) = self.pending_pairings.take_rendezvous() {
719 (None, pairing.connection_name, false)
720 } else {
721 return Err(ClientError::InvalidState {
722 expected: "pending rendezvous pairing".to_string(),
723 current: "no pending rendezvous pairing".to_string(),
724 });
725 }
726 }
727 }
728 };
729
730 let (transport, fingerprint_str) = self
731 .complete_handshake(source, &data, &ciphersuite, psk_for_handshake.as_ref())
732 .await?;
733
734 notify!(
735 notification_tx,
736 UserClientNotification::HandshakeComplete {}
737 );
738
739 if is_new_connection && !is_psk_connection {
740 self.pending_pairings.prepare_buffering(source);
743
744 let (tx, rx) = oneshot::channel();
745
746 if request_tx.capacity() == 0 {
747 warn!("Request channel full, waiting for consumer to drain");
748 }
749 request_tx
750 .send(UserClientRequest::VerifyFingerprint {
751 fingerprint: fingerprint_str,
752 identity: source,
753 reply: tx,
754 })
755 .await
756 .ok();
757
758 let fut: PendingReplyFuture = Box::pin(async move {
759 let result = rx.await;
760 PendingReply::FingerprintVerification {
761 source,
762 transport,
763 connection_name: matched_pairing_name,
764 reply: result,
765 }
766 });
767
768 Ok(Some(fut))
769 } else if !is_new_connection {
770 let existing = self.connection_store.get(&source).await;
773 let now = crate::compat::now_seconds();
774 self.transports.insert(source, transport.clone());
775 self.connection_store
776 .save(ConnectionInfo {
777 fingerprint: source,
778 name: existing.as_ref().and_then(|s| s.name.clone()),
779 cached_at: existing.as_ref().map_or(now, |s| s.cached_at),
780 last_connected_at: now,
781 transport_state: Some(transport),
782 })
783 .await?;
784
785 self.audit_log
786 .write(AuditEvent::SessionRefreshed {
787 remote_identity: &source,
788 })
789 .await;
790
791 notify!(
792 notification_tx,
793 UserClientNotification::SessionRefreshed {
794 fingerprint: source,
795 }
796 );
797
798 Ok(None)
799 } else {
800 self.accept_new_connection(
802 source,
803 transport,
804 matched_pairing_name.as_deref(),
805 AuditConnectionType::Psk,
806 )
807 .await?;
808
809 notify!(
811 notification_tx,
812 UserClientNotification::HandshakeFingerprint {
813 fingerprint: fingerprint_str,
814 identity: source,
815 }
816 );
817
818 Ok(None)
819 }
820 }
821
822 async fn accept_new_connection(
824 &mut self,
825 fingerprint: IdentityFingerprint,
826 transport: MultiDeviceTransport,
827 connection_name: Option<&str>,
828 connection_type: AuditConnectionType,
829 ) -> Result<(), ClientError> {
830 let now = crate::compat::now_seconds();
831 self.transports.insert(fingerprint, transport.clone());
832 self.connection_store
833 .save(ConnectionInfo {
834 fingerprint,
835 name: connection_name.map(|s| s.to_owned()),
836 cached_at: now,
837 last_connected_at: now,
838 transport_state: Some(transport),
839 })
840 .await?;
841
842 self.audit_log
843 .write(AuditEvent::ConnectionEstablished {
844 remote_identity: &fingerprint,
845 remote_name: connection_name,
846 connection_type,
847 })
848 .await;
849
850 Ok(())
851 }
852
853 async fn handle_credential_request(
855 &mut self,
856 source: IdentityFingerprint,
857 encrypted: String,
858 notification_tx: &mpsc::Sender<UserClientNotification>,
859 request_tx: &mpsc::Sender<UserClientRequest>,
860 ) -> Result<Option<PendingReplyFuture>, ClientError> {
861 if !self.transports.contains_key(&source) {
862 debug!("Loading transport state for source: {:?}", source);
863 let connection = self.connection_store.get(&source).await.ok_or_else(|| {
864 ClientError::ConnectionCache(format!("Missing cached connection {source:?}"))
865 })?;
866 let transport = connection.transport_state.ok_or_else(|| {
867 ClientError::ConnectionCache(format!(
868 "Missing transport state for cached connection {source:?}"
869 ))
870 })?;
871 self.transports.insert(source, transport);
872 }
873
874 let transport = self
876 .transports
877 .get_mut(&source)
878 .ok_or(ClientError::SecureChannelNotEstablished)?;
879
880 let encrypted_bytes = STANDARD
882 .decode(&encrypted)
883 .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
884
885 let packet = ap_noise::TransportPacket::decode(&encrypted_bytes)
886 .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
887
888 let decrypted = transport
889 .decrypt(&packet)
890 .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
891
892 let request: CredentialRequestPayload = serde_json::from_slice(&decrypted)?;
893
894 self.audit_log
895 .write(AuditEvent::CredentialRequested {
896 query: &request.query,
897 remote_identity: &source,
898 request_id: &request.request_id,
899 })
900 .await;
901
902 let (tx, rx) = oneshot::channel();
904
905 if request_tx.capacity() == 0 {
907 warn!("Request channel full, waiting for consumer to drain");
908 }
909 if request_tx
910 .send(UserClientRequest::CredentialRequest {
911 query: request.query.clone(),
912 identity: source,
913 reply: tx,
914 })
915 .await
916 .is_err()
917 {
918 warn!("Request channel closed, cannot send credential request");
920 notify!(
921 notification_tx,
922 UserClientNotification::Error {
923 message: "Request channel closed".to_string(),
924 context: Some("handle_credential_request".to_string()),
925 }
926 );
927 return Ok(None);
928 }
929
930 let request_id = request.request_id;
932 let query = request.query;
933 let fut: PendingReplyFuture = Box::pin(async move {
934 let result = rx.await;
935 PendingReply::CredentialResponse {
936 source,
937 request_id,
938 query,
939 reply: result,
940 }
941 });
942
943 Ok(Some(fut))
944 }
945
946 async fn process_pending_reply(
948 &mut self,
949 reply: PendingReply,
950 notification_tx: &mpsc::Sender<UserClientNotification>,
951 request_tx: &mpsc::Sender<UserClientRequest>,
952 ) -> Result<Vec<PendingReplyFuture>, ClientError> {
953 match reply {
954 PendingReply::FingerprintVerification {
955 source,
956 transport,
957 connection_name,
958 reply,
959 } => {
960 self.process_fingerprint_reply(
961 source,
962 transport,
963 connection_name,
964 reply,
965 notification_tx,
966 request_tx,
967 )
968 .await
969 }
970 PendingReply::CredentialResponse {
971 source,
972 request_id,
973 query,
974 reply,
975 } => {
976 self.process_credential_reply(source, request_id, query, reply, notification_tx)
977 .await?;
978 Ok(Vec::new())
979 }
980 }
981 }
982
983 async fn handle_command(
985 &mut self,
986 cmd: UserClientCommand,
987 notification_tx: &mpsc::Sender<UserClientNotification>,
988 ) {
989 match cmd {
990 UserClientCommand::GetPskToken {
991 name,
992 reusable,
993 reply,
994 } => {
995 let result = self.generate_psk_token(name, reusable).await;
996 let _ = reply.send(result);
997 }
998 UserClientCommand::GetRendezvousToken { name, reply } => {
999 if let Err(e) = self.proxy_client.request_rendezvous().await {
1000 let _ = reply.send(Err(e));
1001 return;
1002 }
1003
1004 self.pending_pairings.prune_stale();
1006
1007 self.pending_pairings.rendezvous = Some(RendezvousPairing {
1010 connection_name: name,
1011 created_at: Instant::now(),
1012 code_tx: Some(reply),
1013 });
1014
1015 notify!(
1017 notification_tx,
1018 UserClientNotification::HandshakeProgress {
1019 message: "Requesting rendezvous code from proxy...".to_string(),
1020 }
1021 );
1022 }
1023 }
1024 }
1025
1026 async fn generate_psk_token(
1028 &mut self,
1029 name: Option<String>,
1030 reusable: bool,
1031 ) -> Result<String, ClientError> {
1032 if reusable && self.psk_store.is_none() {
1033 return Err(ClientError::InvalidState {
1034 expected: "PskStore configured".to_string(),
1035 current: "no PskStore provided".to_string(),
1036 });
1037 }
1038
1039 let psk = Psk::generate();
1040 let psk_id = psk.id();
1041 let token = PskToken::new(psk.clone(), self.own_fingerprint).to_string();
1042
1043 self.pending_pairings.prune_stale();
1044 self.pending_pairings.psk_pairings.insert(
1045 psk_id.clone(),
1046 PskPairing {
1047 connection_name: name.clone(),
1048 created_at: Instant::now(),
1049 psk: psk.clone(),
1050 reusable,
1051 },
1052 );
1053
1054 if reusable {
1055 if let Some(store) = &mut self.psk_store {
1056 store
1057 .save(PskEntry {
1058 psk_id,
1059 psk,
1060 name,
1061 created_at: crate::compat::now_seconds(),
1062 })
1063 .await?;
1064 debug!("Created reusable PSK pairing, token generated and persisted");
1065 }
1066 } else {
1067 debug!("Created ephemeral PSK pairing, token generated");
1068 }
1069
1070 Ok(token)
1071 }
1072
1073 async fn process_fingerprint_reply(
1075 &mut self,
1076 source: IdentityFingerprint,
1077 transport: MultiDeviceTransport,
1078 connection_name: Option<String>,
1079 reply: Result<FingerprintVerificationReply, oneshot::error::RecvError>,
1080 notification_tx: &mpsc::Sender<UserClientNotification>,
1081 request_tx: &mpsc::Sender<UserClientRequest>,
1082 ) -> Result<Vec<PendingReplyFuture>, ClientError> {
1083 match reply {
1084 Ok(FingerprintVerificationReply {
1085 approved: true,
1086 name,
1087 }) => {
1088 let conn_name = name.or(connection_name);
1090 self.accept_new_connection(
1091 source,
1092 transport,
1093 conn_name.as_deref(),
1094 AuditConnectionType::Rendezvous,
1095 )
1096 .await?;
1097
1098 notify!(
1099 notification_tx,
1100 UserClientNotification::FingerprintVerified {}
1101 );
1102
1103 let mut futures = Vec::new();
1105 for msg in self.pending_pairings.take_buffered_messages(&source) {
1106 match self.handle_incoming(msg, notification_tx, request_tx).await {
1107 Ok(Some(fut)) => futures.push(fut),
1108 Ok(None) => {}
1109 Err(e) => {
1110 warn!("Error processing buffered message: {}", e);
1111 }
1112 }
1113 }
1114
1115 Ok(futures)
1116 }
1117 Ok(FingerprintVerificationReply {
1118 approved: false, ..
1119 }) => {
1120 self.reject_fingerprint(
1121 &source,
1122 "User rejected fingerprint verification",
1123 notification_tx,
1124 )
1125 .await;
1126 Ok(Vec::new())
1127 }
1128 Err(_) => {
1129 warn!("Fingerprint verification reply channel dropped, treating as rejection");
1130 self.reject_fingerprint(
1131 &source,
1132 "Verification cancelled (reply dropped)",
1133 notification_tx,
1134 )
1135 .await;
1136 Ok(Vec::new())
1137 }
1138 }
1139 }
1140
1141 async fn reject_fingerprint(
1143 &mut self,
1144 source: &IdentityFingerprint,
1145 reason: &str,
1146 notification_tx: &mpsc::Sender<UserClientNotification>,
1147 ) {
1148 self.pending_pairings.take_buffered_messages(source);
1149
1150 self.audit_log
1151 .write(AuditEvent::ConnectionRejected {
1152 remote_identity: source,
1153 })
1154 .await;
1155
1156 notify!(
1157 notification_tx,
1158 UserClientNotification::FingerprintRejected {
1159 reason: reason.to_string(),
1160 }
1161 );
1162 }
1163
1164 #[allow(clippy::too_many_arguments)]
1166 async fn process_credential_reply(
1167 &mut self,
1168 source: IdentityFingerprint,
1169 request_id: String,
1170 query: crate::types::CredentialQuery,
1171 reply: Result<CredentialRequestReply, oneshot::error::RecvError>,
1172 notification_tx: &mpsc::Sender<UserClientNotification>,
1173 ) -> Result<(), ClientError> {
1174 let reply = match reply {
1175 Ok(r) => r,
1176 Err(_) => {
1177 warn!("Credential reply channel dropped, treating as denial");
1179 CredentialRequestReply {
1180 approved: false,
1181 credential: None,
1182 credential_id: None,
1183 }
1184 }
1185 };
1186
1187 let transport = self
1188 .transports
1189 .get_mut(&source)
1190 .ok_or(ClientError::SecureChannelNotEstablished)?;
1191
1192 let domain = reply.credential.as_ref().and_then(|c| c.domain.clone());
1194 let fields = reply
1195 .credential
1196 .as_ref()
1197 .map_or_else(CredentialFieldSet::default, |c| CredentialFieldSet {
1198 has_username: c.username.is_some(),
1199 has_password: c.password.is_some(),
1200 has_totp: c.totp.is_some(),
1201 has_uri: c.uri.is_some(),
1202 has_notes: c.notes.is_some(),
1203 });
1204
1205 let response_payload = CredentialResponsePayload {
1207 credential: if reply.approved {
1208 reply.credential
1209 } else {
1210 None
1211 },
1212 error: if !reply.approved {
1213 Some("Request denied".to_string())
1214 } else {
1215 None
1216 },
1217 request_id: Some(request_id.clone()),
1218 };
1219
1220 let response_json = serde_json::to_string(&response_payload)?;
1222 let encrypted = transport
1223 .encrypt(response_json.as_bytes())
1224 .map_err(|e| ClientError::NoiseProtocol(e.to_string()))?;
1225
1226 let msg = ProtocolMessage::CredentialResponse {
1227 encrypted: STANDARD.encode(encrypted.encode()),
1228 };
1229
1230 let msg_json = serde_json::to_string(&msg)?;
1231
1232 self.proxy_client
1233 .send_to(source, msg_json.into_bytes())
1234 .await?;
1235
1236 if reply.approved {
1238 self.audit_log
1239 .write(AuditEvent::CredentialApproved {
1240 query: &query,
1241 domain: domain.as_deref(),
1242 remote_identity: &source,
1243 request_id: &request_id,
1244 credential_id: reply.credential_id.as_deref(),
1245 fields,
1246 })
1247 .await;
1248
1249 notify!(
1250 notification_tx,
1251 UserClientNotification::CredentialApproved {
1252 domain,
1253 credential_id: reply.credential_id,
1254 }
1255 );
1256 } else {
1257 self.audit_log
1258 .write(AuditEvent::CredentialDenied {
1259 query: &query,
1260 domain: domain.as_deref(),
1261 remote_identity: &source,
1262 request_id: &request_id,
1263 credential_id: reply.credential_id.as_deref(),
1264 })
1265 .await;
1266
1267 notify!(
1268 notification_tx,
1269 UserClientNotification::CredentialDenied {
1270 domain,
1271 credential_id: reply.credential_id,
1272 }
1273 );
1274 }
1275
1276 Ok(())
1277 }
1278
1279 async fn complete_handshake(
1281 &self,
1282 remote_fingerprint: IdentityFingerprint,
1283 handshake_data: &str,
1284 ciphersuite_str: &str,
1285 psk: Option<&Psk>,
1286 ) -> Result<(MultiDeviceTransport, String), ClientError> {
1287 let ciphersuite = match ciphersuite_str {
1289 s if s.contains("Kyber768") => Ciphersuite::PQNNpsk2_Kyber768_XChaCha20Poly1305,
1290 _ => Ciphersuite::ClassicalNNpsk2_25519_XChaCha20Poly1035,
1291 };
1292
1293 let init_bytes = STANDARD
1295 .decode(handshake_data)
1296 .map_err(|e| ClientError::Serialization(format!("Invalid base64: {e}")))?;
1297
1298 let init_packet = ap_noise::HandshakePacket::decode(&init_bytes)
1299 .map_err(|e| ClientError::NoiseProtocol(format!("Invalid packet: {e}")))?;
1300
1301 let mut handshake = if let Some(psk) = psk {
1303 ResponderHandshake::with_psk(psk.clone())
1304 } else {
1305 ResponderHandshake::new()
1306 };
1307
1308 handshake.receive_start(&init_packet)?;
1310 let response_packet = handshake.send_finish()?;
1311 let (transport, fingerprint) = handshake.finalize()?;
1312
1313 let msg = ProtocolMessage::HandshakeResponse {
1315 data: STANDARD.encode(response_packet.encode()?),
1316 ciphersuite: format!("{ciphersuite:?}"),
1317 };
1318
1319 let msg_json = serde_json::to_string(&msg)?;
1320
1321 self.proxy_client
1322 .send_to(remote_fingerprint, msg_json.into_bytes())
1323 .await?;
1324
1325 debug!("Sent handshake response to {:?}", remote_fingerprint);
1326
1327 Ok((transport, fingerprint.to_string()))
1328 }
1329}