1use std::collections::{HashMap, VecDeque};
53use std::net::SocketAddr;
54use std::sync::Arc;
55use std::sync::atomic::{AtomicBool, Ordering};
56use std::time::{Duration, Instant};
57
58use tokio::sync::{RwLock, broadcast};
59use tracing::{debug, error, info, warn};
60
61use crate::auth::{AuthManager, AuthMessage, AuthProtocol};
62use crate::crypto::raw_public_keys::key_utils::{
63 derive_peer_id_from_public_key, generate_ed25519_keypair,
64};
65use crate::nat_traversal_api::{
66 NatTraversalEndpoint, NatTraversalError, NatTraversalEvent, NatTraversalStatistics, PeerId,
67};
68
69pub use crate::nat_traversal_api::TraversalPhase;
71use crate::unified_config::P2pConfig;
72
73const EVENT_CHANNEL_CAPACITY: usize = 256;
75
76pub struct P2pEndpoint {
81 inner: Arc<NatTraversalEndpoint>,
83
84 auth_manager: Arc<AuthManager>,
86
87 connected_peers: Arc<RwLock<HashMap<PeerId, PeerConnection>>>,
89
90 stats: Arc<RwLock<EndpointStats>>,
92
93 config: P2pConfig,
95
96 event_tx: broadcast::Sender<P2pEvent>,
98
99 peer_id: PeerId,
101
102 shutdown: Arc<AtomicBool>,
104
105 pending_data: Arc<RwLock<HashMap<PeerId, VecDeque<Vec<u8>>>>>,
108}
109
110impl std::fmt::Debug for P2pEndpoint {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("P2pEndpoint")
113 .field("peer_id", &self.peer_id)
114 .field("config", &self.config)
115 .finish_non_exhaustive()
116 }
117}
118
119#[derive(Debug, Clone)]
121pub struct PeerConnection {
122 pub peer_id: PeerId,
124
125 pub remote_addr: SocketAddr,
127
128 pub authenticated: bool,
130
131 pub connected_at: Instant,
133
134 pub last_activity: Instant,
136}
137
138#[derive(Debug, Clone, Default)]
140pub struct ConnectionMetrics {
141 pub bytes_sent: u64,
143
144 pub bytes_received: u64,
146
147 pub rtt: Option<Duration>,
149
150 pub packet_loss: f64,
152
153 pub last_activity: Option<Instant>,
155}
156
157#[derive(Debug, Clone)]
159pub struct EndpointStats {
160 pub active_connections: usize,
162
163 pub successful_connections: u64,
165
166 pub failed_connections: u64,
168
169 pub nat_traversal_attempts: u64,
171
172 pub nat_traversal_successes: u64,
174
175 pub direct_connections: u64,
177
178 pub relayed_connections: u64,
180
181 pub total_bootstrap_nodes: usize,
183
184 pub connected_bootstrap_nodes: usize,
186
187 pub start_time: Instant,
189
190 pub average_coordination_time: Duration,
192}
193
194impl Default for EndpointStats {
195 fn default() -> Self {
196 Self {
197 active_connections: 0,
198 successful_connections: 0,
199 failed_connections: 0,
200 nat_traversal_attempts: 0,
201 nat_traversal_successes: 0,
202 direct_connections: 0,
203 relayed_connections: 0,
204 total_bootstrap_nodes: 0,
205 connected_bootstrap_nodes: 0,
206 start_time: Instant::now(),
207 average_coordination_time: Duration::ZERO,
208 }
209 }
210}
211
212#[derive(Debug, Clone)]
214pub enum P2pEvent {
215 PeerConnected {
217 peer_id: PeerId,
219 addr: SocketAddr,
221 },
222
223 PeerDisconnected {
225 peer_id: PeerId,
227 reason: DisconnectReason,
229 },
230
231 NatTraversalProgress {
233 peer_id: PeerId,
235 phase: TraversalPhase,
237 },
238
239 ExternalAddressDiscovered {
241 addr: SocketAddr,
243 },
244
245 BootstrapStatus {
247 connected: usize,
249 total: usize,
251 },
252
253 PeerAuthenticated {
255 peer_id: PeerId,
257 },
258
259 DataReceived {
261 peer_id: PeerId,
263 bytes: usize,
265 },
266}
267
268#[derive(Debug, Clone)]
270pub enum DisconnectReason {
271 Normal,
273 Timeout,
275 ProtocolError(String),
277 AuthenticationFailed,
279 ConnectionLost,
281 RemoteClosed,
283}
284
285#[derive(Debug, thiserror::Error)]
289pub enum EndpointError {
290 #[error("Configuration error: {0}")]
292 Config(String),
293
294 #[error("Connection error: {0}")]
296 Connection(String),
297
298 #[error("NAT traversal error: {0}")]
300 NatTraversal(#[from] NatTraversalError),
301
302 #[error("Authentication error: {0}")]
304 Authentication(String),
305
306 #[error("Operation timed out")]
308 Timeout,
309
310 #[error("Peer not found: {0:?}")]
312 PeerNotFound(PeerId),
313
314 #[error("Already connected to peer: {0:?}")]
316 AlreadyConnected(PeerId),
317
318 #[error("Endpoint is shutting down")]
320 ShuttingDown,
321}
322
323impl P2pEndpoint {
324 pub async fn new(config: P2pConfig) -> Result<Self, EndpointError> {
326 let (secret_key, public_key) = generate_ed25519_keypair();
328 let peer_id = derive_peer_id_from_public_key(&public_key);
329
330 info!("Creating P2P endpoint with peer ID: {:?}", peer_id);
331
332 let auth_manager = Arc::new(AuthManager::new(secret_key.clone(), config.auth.clone()));
334
335 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
337 let event_tx_clone = event_tx.clone();
338
339 let stats = Arc::new(RwLock::new(EndpointStats {
341 total_bootstrap_nodes: config.known_peers.len(),
342 start_time: Instant::now(),
343 ..Default::default()
344 }));
345 let stats_clone = Arc::clone(&stats);
346
347 let event_callback = Box::new(move |event: NatTraversalEvent| {
349 let event_tx = event_tx_clone.clone();
350 let stats = stats_clone.clone();
351
352 tokio::spawn(async move {
353 let mut stats_guard = stats.write().await;
355 match &event {
356 NatTraversalEvent::CoordinationRequested { .. } => {
357 stats_guard.nat_traversal_attempts += 1;
358 }
359 NatTraversalEvent::ConnectionEstablished {
360 peer_id,
361 remote_address,
362 } => {
363 stats_guard.nat_traversal_successes += 1;
364 stats_guard.active_connections += 1;
365 stats_guard.successful_connections += 1;
366
367 let _ = event_tx.send(P2pEvent::PeerConnected {
369 peer_id: *peer_id,
370 addr: *remote_address,
371 });
372 }
373 NatTraversalEvent::TraversalFailed { peer_id, .. } => {
374 stats_guard.failed_connections += 1;
375 let _ = event_tx.send(P2pEvent::NatTraversalProgress {
376 peer_id: *peer_id,
377 phase: TraversalPhase::Failed,
378 });
379 }
380 NatTraversalEvent::PhaseTransition {
381 peer_id, to_phase, ..
382 } => {
383 let _ = event_tx.send(P2pEvent::NatTraversalProgress {
384 peer_id: *peer_id,
385 phase: *to_phase,
386 });
387 }
388 NatTraversalEvent::ExternalAddressDiscovered { address, .. } => {
389 info!("External address discovered: {}", address);
390 let _ =
391 event_tx.send(P2pEvent::ExternalAddressDiscovered { addr: *address });
392 }
393 _ => {}
394 }
395 drop(stats_guard);
396 });
397 });
398
399 let nat_config = config.to_nat_config_with_key(secret_key);
402 let inner = NatTraversalEndpoint::new(nat_config, Some(event_callback))
403 .await
404 .map_err(|e| EndpointError::Config(e.to_string()))?;
405
406 Ok(Self {
407 inner: Arc::new(inner),
408 auth_manager,
409 connected_peers: Arc::new(RwLock::new(HashMap::new())),
410 stats,
411 config,
412 event_tx,
413 peer_id,
414 shutdown: Arc::new(AtomicBool::new(false)),
415 pending_data: Arc::new(RwLock::new(HashMap::new())),
416 })
417 }
418
419 pub fn peer_id(&self) -> PeerId {
421 self.peer_id
422 }
423
424 pub fn local_addr(&self) -> Option<SocketAddr> {
426 self.inner
427 .get_endpoint()
428 .and_then(|ep| ep.local_addr().ok())
429 }
430
431 pub fn external_addr(&self) -> Option<SocketAddr> {
433 self.inner.get_observed_external_address().ok().flatten()
434 }
435
436 pub fn public_key_bytes(&self) -> [u8; 32] {
438 self.auth_manager.public_key_bytes()
439 }
440
441 pub async fn connect(&self, addr: SocketAddr) -> Result<PeerConnection, EndpointError> {
445 if self.shutdown.load(Ordering::SeqCst) {
446 return Err(EndpointError::ShuttingDown);
447 }
448
449 info!("Connecting directly to {}", addr);
450
451 let endpoint = self
452 .inner
453 .get_endpoint()
454 .ok_or_else(|| EndpointError::Config("QUIC endpoint not available".to_string()))?;
455
456 let connecting = endpoint
457 .connect(addr, "peer")
458 .map_err(|e| EndpointError::Connection(e.to_string()))?;
459
460 let connection = connecting
461 .await
462 .map_err(|e| EndpointError::Connection(e.to_string()))?;
463
464 let peer_id = self
466 .inner
467 .extract_peer_id_from_connection(&connection)
468 .await
469 .unwrap_or_else(|| self.derive_peer_id_from_address(addr));
470
471 self.inner
473 .add_connection(peer_id, connection.clone())
474 .map_err(EndpointError::NatTraversal)?;
475
476 self.inner
478 .spawn_connection_handler(peer_id, connection)
479 .map_err(EndpointError::NatTraversal)?;
480
481 let peer_conn = PeerConnection {
483 peer_id,
484 remote_addr: addr,
485 authenticated: false,
486 connected_at: Instant::now(),
487 last_activity: Instant::now(),
488 };
489
490 self.connected_peers
492 .write()
493 .await
494 .insert(peer_id, peer_conn.clone());
495
496 if self.config.auth.require_authentication {
498 if let Err(err) = self.authenticate_peer(&peer_id).await {
499 let _ = self.inner.remove_connection(&peer_id);
500 let _ = self.connected_peers.write().await.remove(&peer_id);
501 return Err(err);
502 }
503 }
504
505 {
507 let mut stats = self.stats.write().await;
508 stats.active_connections += 1;
509 stats.successful_connections += 1;
510 stats.direct_connections += 1;
511 }
512
513 let _ = self
515 .event_tx
516 .send(P2pEvent::PeerConnected { peer_id, addr });
517
518 Ok(peer_conn)
519 }
520
521 pub async fn connect_to_peer(
523 &self,
524 peer_id: PeerId,
525 coordinator: Option<SocketAddr>,
526 ) -> Result<PeerConnection, EndpointError> {
527 if self.shutdown.load(Ordering::SeqCst) {
528 return Err(EndpointError::ShuttingDown);
529 }
530
531 let coord_addr = coordinator
532 .or_else(|| self.config.known_peers.first().copied())
533 .ok_or_else(|| EndpointError::Config("No coordinator available".to_string()))?;
534
535 info!(
536 "Initiating NAT traversal to peer {:?} via coordinator {}",
537 peer_id, coord_addr
538 );
539
540 let _ = self.event_tx.send(P2pEvent::NatTraversalProgress {
542 peer_id,
543 phase: TraversalPhase::Discovery,
544 });
545
546 self.inner
548 .initiate_nat_traversal(peer_id, coord_addr)
549 .map_err(EndpointError::NatTraversal)?;
550
551 let start = Instant::now();
553 let timeout = self
554 .config
555 .timeouts
556 .nat_traversal
557 .connection_establishment_timeout;
558
559 while start.elapsed() < timeout {
560 if self.shutdown.load(Ordering::SeqCst) {
561 return Err(EndpointError::ShuttingDown);
562 }
563
564 let events = self
565 .inner
566 .poll(Instant::now())
567 .map_err(EndpointError::NatTraversal)?;
568
569 for event in events {
570 match event {
571 NatTraversalEvent::ConnectionEstablished {
572 peer_id: evt_peer,
573 remote_address,
574 } if evt_peer == peer_id => {
575 let peer_conn = PeerConnection {
576 peer_id,
577 remote_addr: remote_address,
578 authenticated: false,
579 connected_at: Instant::now(),
580 last_activity: Instant::now(),
581 };
582
583 self.connected_peers
584 .write()
585 .await
586 .insert(peer_id, peer_conn.clone());
587
588 if self.config.auth.require_authentication {
590 self.authenticate_peer(&peer_id).await?;
591 }
592
593 return Ok(peer_conn);
594 }
595 NatTraversalEvent::TraversalFailed {
596 peer_id: evt_peer,
597 error,
598 ..
599 } if evt_peer == peer_id => {
600 return Err(EndpointError::NatTraversal(error));
601 }
602 _ => {}
603 }
604 }
605
606 tokio::time::sleep(Duration::from_millis(50)).await;
607 }
608
609 Err(EndpointError::Timeout)
610 }
611
612 pub async fn accept(&self) -> Option<PeerConnection> {
614 if self.shutdown.load(Ordering::SeqCst) {
615 return None;
616 }
617
618 match self.inner.accept_connection().await {
619 Ok((peer_id, connection)) => {
620 let remote_addr = connection.remote_address();
621 let mut resolved_peer_id = peer_id;
622
623 if let Some(actual_peer_id) = self
624 .inner
625 .extract_peer_id_from_connection(&connection)
626 .await
627 {
628 if actual_peer_id != peer_id {
629 let _ = self.inner.remove_connection(&peer_id);
630 let _ = self
631 .inner
632 .add_connection(actual_peer_id, connection.clone());
633 resolved_peer_id = actual_peer_id;
634 }
635 }
636
637 if let Err(e) = self
638 .inner
639 .spawn_connection_handler(resolved_peer_id, connection)
640 {
641 error!("Failed to spawn connection handler: {}", e);
642 return None;
643 }
644
645 let peer_conn = PeerConnection {
646 peer_id: resolved_peer_id,
647 remote_addr,
648 authenticated: false,
649 connected_at: Instant::now(),
650 last_activity: Instant::now(),
651 };
652
653 self.connected_peers
654 .write()
655 .await
656 .insert(resolved_peer_id, peer_conn.clone());
657
658 if self.config.auth.require_authentication {
659 if let Err(err) = self.authenticate_peer(&resolved_peer_id).await {
660 let _ = self.inner.remove_connection(&resolved_peer_id);
661 let _ = self.connected_peers.write().await.remove(&resolved_peer_id);
662 warn!(
663 "Authentication failed for peer {:?}: {}",
664 resolved_peer_id, err
665 );
666 return None;
667 }
668 }
669
670 {
671 let mut stats = self.stats.write().await;
672 stats.active_connections += 1;
673 stats.successful_connections += 1;
674 }
675
676 let _ = self.event_tx.send(P2pEvent::PeerConnected {
677 peer_id: resolved_peer_id,
678 addr: remote_addr,
679 });
680
681 Some(peer_conn)
682 }
683 Err(e) => {
684 debug!("Accept failed: {}", e);
685 None
686 }
687 }
688 }
689
690 pub async fn disconnect(&self, peer_id: &PeerId) -> Result<(), EndpointError> {
692 if let Some(peer_conn) = self.connected_peers.write().await.remove(peer_id) {
693 let _ = self.inner.remove_connection(peer_id);
694
695 {
696 let mut stats = self.stats.write().await;
697 stats.active_connections = stats.active_connections.saturating_sub(1);
698 }
699
700 let _ = self.event_tx.send(P2pEvent::PeerDisconnected {
701 peer_id: *peer_id,
702 reason: DisconnectReason::Normal,
703 });
704
705 info!(
706 "Disconnected from peer {:?} at {}",
707 peer_id, peer_conn.remote_addr
708 );
709 Ok(())
710 } else {
711 Err(EndpointError::PeerNotFound(*peer_id))
712 }
713 }
714
715 pub async fn send(&self, peer_id: &PeerId, data: &[u8]) -> Result<(), EndpointError> {
719 if self.shutdown.load(Ordering::SeqCst) {
720 return Err(EndpointError::ShuttingDown);
721 }
722
723 let connection = self
724 .inner
725 .get_connection(peer_id)
726 .map_err(EndpointError::NatTraversal)?
727 .ok_or(EndpointError::PeerNotFound(*peer_id))?;
728
729 let mut send_stream = connection
730 .open_uni()
731 .await
732 .map_err(|e| EndpointError::Connection(e.to_string()))?;
733
734 send_stream
735 .write_all(data)
736 .await
737 .map_err(|e| EndpointError::Connection(e.to_string()))?;
738
739 send_stream
740 .finish()
741 .map_err(|e| EndpointError::Connection(e.to_string()))?;
742
743 if let Some(peer_conn) = self.connected_peers.write().await.get_mut(peer_id) {
745 peer_conn.last_activity = Instant::now();
746 }
747
748 debug!("Sent {} bytes to peer {:?}", data.len(), peer_id);
749 Ok(())
750 }
751
752 pub async fn recv(&self, timeout: Duration) -> Result<(PeerId, Vec<u8>), EndpointError> {
758 if self.shutdown.load(Ordering::SeqCst) {
759 return Err(EndpointError::ShuttingDown);
760 }
761
762 {
764 let mut pending = self.pending_data.write().await;
765 for (peer_id, queue) in pending.iter_mut() {
766 if let Some(data) = queue.pop_front() {
767 if let Some(peer_conn) = self.connected_peers.write().await.get_mut(peer_id) {
768 peer_conn.last_activity = Instant::now();
769 }
770 let _ = self.event_tx.send(P2pEvent::DataReceived {
771 peer_id: *peer_id,
772 bytes: data.len(),
773 });
774 return Ok((*peer_id, data));
775 }
776 }
777 pending.retain(|_, queue| !queue.is_empty());
779 }
780
781 let peers = self.connected_peers.read().await.clone();
782
783 if peers.is_empty() {
784 return Err(EndpointError::Connection("No connected peers".to_string()));
785 }
786
787 let start = Instant::now();
788 let peer_count = peers.len().max(1);
789
790 while start.elapsed() < timeout {
791 let remaining = timeout.saturating_sub(start.elapsed());
793 if remaining.is_zero() {
794 break;
795 }
796
797 let per_peer_timeout = remaining
799 .checked_div(peer_count as u32)
800 .unwrap_or(Duration::from_millis(5))
801 .max(Duration::from_millis(5));
802
803 for (peer_id, _) in peers.iter() {
804 if start.elapsed() >= timeout {
806 break;
807 }
808
809 if let Ok(Some(connection)) = self.inner.get_connection(peer_id) {
810 if let Ok(Ok(mut recv_stream)) =
812 tokio::time::timeout(per_peer_timeout, connection.accept_uni()).await
813 {
814 if let Ok(data) = recv_stream.read_to_end(1024 * 1024).await {
815 if !data.is_empty() {
816 if let Some(peer_conn) =
817 self.connected_peers.write().await.get_mut(peer_id)
818 {
819 peer_conn.last_activity = Instant::now();
820 }
821
822 let _ = self.event_tx.send(P2pEvent::DataReceived {
823 peer_id: *peer_id,
824 bytes: data.len(),
825 });
826 return Ok((*peer_id, data));
827 }
828 }
829 }
830 }
831 }
832
833 if start.elapsed() < timeout {
835 tokio::time::sleep(Duration::from_millis(5)).await;
836 }
837 }
838
839 Err(EndpointError::Timeout)
840 }
841
842 pub fn subscribe(&self) -> broadcast::Receiver<P2pEvent> {
846 self.event_tx.subscribe()
847 }
848
849 pub async fn stats(&self) -> EndpointStats {
853 self.stats.read().await.clone()
854 }
855
856 pub async fn connection_metrics(&self, peer_id: &PeerId) -> Option<ConnectionMetrics> {
858 let connection = self.inner.get_connection(peer_id).ok()??;
859 let stats = connection.stats();
860 let rtt = connection.rtt();
861
862 let last_activity = self
863 .connected_peers
864 .read()
865 .await
866 .get(peer_id)
867 .map(|p| p.last_activity);
868
869 Some(ConnectionMetrics {
870 bytes_sent: stats.udp_tx.bytes,
871 bytes_received: stats.udp_rx.bytes,
872 rtt: Some(rtt),
873 packet_loss: stats.path.lost_packets as f64
874 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
875 last_activity,
876 })
877 }
878
879 pub fn nat_stats(&self) -> Result<NatTraversalStatistics, EndpointError> {
881 self.inner
882 .get_nat_stats()
883 .map_err(|e| EndpointError::Connection(e.to_string()))
884 }
885
886 pub async fn connect_known_peers(&self) -> Result<usize, EndpointError> {
890 let mut connected = 0;
891 let known_peers = self.config.known_peers.clone();
892
893 for addr in &known_peers {
894 match self.connect(*addr).await {
895 Ok(_) => {
896 connected += 1;
897 info!("Connected to known peer {}", addr);
898 }
899 Err(e) => {
900 warn!("Failed to connect to known peer {}: {}", addr, e);
901 }
902 }
903 }
904
905 {
906 let mut stats = self.stats.write().await;
907 stats.connected_bootstrap_nodes = connected;
908 }
909
910 let _ = self.event_tx.send(P2pEvent::BootstrapStatus {
911 connected,
912 total: known_peers.len(),
913 });
914
915 Ok(connected)
916 }
917
918 pub async fn add_bootstrap(&self, addr: SocketAddr) {
920 let _ = self.inner.add_bootstrap_node(addr);
921 let mut stats = self.stats.write().await;
922 stats.total_bootstrap_nodes += 1;
923 }
924
925 pub async fn connected_peers(&self) -> Vec<PeerConnection> {
927 self.connected_peers
928 .read()
929 .await
930 .values()
931 .cloned()
932 .collect()
933 }
934
935 pub async fn is_connected(&self, peer_id: &PeerId) -> bool {
937 self.connected_peers.read().await.contains_key(peer_id)
938 }
939
940 pub async fn is_authenticated(&self, peer_id: &PeerId) -> bool {
942 self.connected_peers
943 .read()
944 .await
945 .get(peer_id)
946 .map(|p| p.authenticated)
947 .unwrap_or(false)
948 }
949
950 pub async fn shutdown(&self) {
954 info!("Shutting down P2P endpoint");
955 self.shutdown.store(true, Ordering::SeqCst);
956
957 let peers: Vec<PeerId> = self.connected_peers.read().await.keys().copied().collect();
959 for peer_id in peers {
960 let _ = self.disconnect(&peer_id).await;
961 }
962
963 let _ = self.inner.shutdown().await;
964 }
965
966 pub fn is_running(&self) -> bool {
968 !self.shutdown.load(Ordering::SeqCst)
969 }
970
971 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
974 use std::collections::hash_map::DefaultHasher;
975 use std::hash::{Hash, Hasher};
976
977 let mut hasher = DefaultHasher::new();
978 addr.hash(&mut hasher);
979 let hash = hasher.finish();
980
981 let mut peer_id_bytes = [0u8; 32];
982 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
983 peer_id_bytes[8..10].copy_from_slice(&addr.port().to_le_bytes());
984
985 PeerId(peer_id_bytes)
986 }
987
988 async fn authenticate_peer(&self, peer_id: &PeerId) -> Result<(), EndpointError> {
989 info!("Authenticating peer {:?}", peer_id);
990
991 let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
992 let auth_request = auth_protocol.initiate_auth().await;
993 let data = AuthManager::serialize_message(&auth_request)
994 .map_err(|e| EndpointError::Authentication(e.to_string()))?;
995
996 self.send(peer_id, &data).await?;
997
998 let timeout = self.config.auth.auth_timeout;
999 let start = Instant::now();
1000
1001 while start.elapsed() < timeout {
1002 let remaining = timeout.saturating_sub(start.elapsed());
1003
1004 match self.recv(remaining).await {
1005 Ok((recv_peer_id, data)) if recv_peer_id == *peer_id => {
1006 let message = AuthManager::deserialize_message(&data)
1007 .map_err(|e| EndpointError::Authentication(e.to_string()))?;
1008
1009 if let AuthMessage::AuthFailure { reason } = &message {
1010 return Err(EndpointError::Authentication(reason.clone()));
1011 }
1012
1013 let response = auth_protocol
1014 .handle_message(*peer_id, message.clone())
1015 .await
1016 .map_err(|e| EndpointError::Authentication(e.to_string()))?;
1017
1018 if let Some(response) = response {
1019 let response_data = AuthManager::serialize_message(&response)
1020 .map_err(|e| EndpointError::Authentication(e.to_string()))?;
1021 self.send(peer_id, &response_data).await?;
1022
1023 if matches!(response, AuthMessage::AuthSuccess { .. }) {
1024 if let Some(peer_conn) =
1025 self.connected_peers.write().await.get_mut(peer_id)
1026 {
1027 peer_conn.authenticated = true;
1028 }
1029 let _ = self
1030 .event_tx
1031 .send(P2pEvent::PeerAuthenticated { peer_id: *peer_id });
1032 return Ok(());
1033 }
1034 }
1035
1036 if matches!(message, AuthMessage::AuthSuccess { .. }) {
1037 if let Some(peer_conn) = self.connected_peers.write().await.get_mut(peer_id)
1038 {
1039 peer_conn.authenticated = true;
1040 }
1041 let _ = self
1042 .event_tx
1043 .send(P2pEvent::PeerAuthenticated { peer_id: *peer_id });
1044 return Ok(());
1045 }
1046 }
1047 Ok((other_peer_id, data)) => {
1048 debug!(
1051 "Buffering {} bytes from peer {:?} during authentication",
1052 data.len(),
1053 other_peer_id
1054 );
1055 let mut pending = self.pending_data.write().await;
1056 pending
1057 .entry(other_peer_id)
1058 .or_insert_with(VecDeque::new)
1059 .push_back(data);
1060 continue;
1061 }
1062 Err(EndpointError::Timeout) => break,
1063 Err(e) => {
1064 return Err(EndpointError::Authentication(format!(
1065 "Authentication failed: {e}"
1066 )));
1067 }
1068 }
1069 }
1070
1071 Err(EndpointError::Authentication(
1072 "Authentication timeout".to_string(),
1073 ))
1074 }
1075}
1076
1077impl Clone for P2pEndpoint {
1078 fn clone(&self) -> Self {
1079 Self {
1080 inner: Arc::clone(&self.inner),
1081 auth_manager: Arc::clone(&self.auth_manager),
1082 connected_peers: Arc::clone(&self.connected_peers),
1083 stats: Arc::clone(&self.stats),
1084 config: self.config.clone(),
1085 event_tx: self.event_tx.clone(),
1086 peer_id: self.peer_id,
1087 shutdown: Arc::clone(&self.shutdown),
1088 pending_data: Arc::clone(&self.pending_data),
1089 }
1090 }
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095 use super::*;
1096
1097 #[test]
1098 fn test_endpoint_stats_default() {
1099 let stats = EndpointStats::default();
1100 assert_eq!(stats.active_connections, 0);
1101 assert_eq!(stats.successful_connections, 0);
1102 assert_eq!(stats.nat_traversal_attempts, 0);
1103 }
1104
1105 #[test]
1106 fn test_connection_metrics_default() {
1107 let metrics = ConnectionMetrics::default();
1108 assert_eq!(metrics.bytes_sent, 0);
1109 assert_eq!(metrics.bytes_received, 0);
1110 assert!(metrics.rtt.is_none());
1111 assert_eq!(metrics.packet_loss, 0.0);
1112 }
1113
1114 #[test]
1115 fn test_peer_connection_debug() {
1116 let conn = PeerConnection {
1117 peer_id: PeerId([0u8; 32]),
1118 remote_addr: "127.0.0.1:8080".parse().expect("valid addr"),
1119 authenticated: false,
1120 connected_at: Instant::now(),
1121 last_activity: Instant::now(),
1122 };
1123 let debug_str = format!("{:?}", conn);
1124 assert!(debug_str.contains("PeerConnection"));
1125 }
1126
1127 #[test]
1128 fn test_disconnect_reason_debug() {
1129 let reason = DisconnectReason::Normal;
1130 assert!(format!("{:?}", reason).contains("Normal"));
1131
1132 let reason = DisconnectReason::ProtocolError("test".to_string());
1133 assert!(format!("{:?}", reason).contains("test"));
1134 }
1135
1136 #[test]
1137 fn test_traversal_phase_debug() {
1138 let phase = TraversalPhase::Discovery;
1139 assert!(format!("{:?}", phase).contains("Discovery"));
1140 }
1141
1142 #[test]
1143 fn test_endpoint_error_display() {
1144 let err = EndpointError::Timeout;
1145 assert!(err.to_string().contains("timed out"));
1146
1147 let err = EndpointError::PeerNotFound(PeerId([0u8; 32]));
1148 assert!(err.to_string().contains("not found"));
1149 }
1150
1151 #[tokio::test]
1152 async fn test_endpoint_creation() {
1153 let config = P2pConfig::builder().build().expect("valid config");
1155
1156 let result = P2pEndpoint::new(config).await;
1157 if let Ok(endpoint) = result {
1159 assert!(endpoint.is_running());
1160 assert!(endpoint.local_addr().is_some() || endpoint.local_addr().is_none());
1161 }
1162 }
1163}