1use std::collections::HashMap;
53use std::net::SocketAddr;
54use std::sync::Arc;
55use std::sync::atomic::{AtomicBool, Ordering};
56use std::time::{Duration, Instant};
57
58use tokio::sync::{RwLock, broadcast};
59use tracing::{debug, error, info, warn};
60
61use crate::bounded_pending_buffer::BoundedPendingBuffer;
63use crate::crypto::raw_public_keys::key_utils::{
64 MlDsaPublicKey, MlDsaSecretKey, derive_peer_id_from_public_key, generate_ml_dsa_keypair,
65};
66use crate::nat_traversal_api::{
67 NatTraversalEndpoint, NatTraversalError, NatTraversalEvent, NatTraversalStatistics, PeerId,
68};
69
70pub use crate::nat_traversal_api::TraversalPhase;
72use crate::unified_config::P2pConfig;
73
74const EVENT_CHANNEL_CAPACITY: usize = 256;
76
77pub struct P2pEndpoint {
82 inner: Arc<NatTraversalEndpoint>,
84
85 connected_peers: Arc<RwLock<HashMap<PeerId, PeerConnection>>>,
88
89 stats: Arc<RwLock<EndpointStats>>,
91
92 config: P2pConfig,
94
95 event_tx: broadcast::Sender<P2pEvent>,
97
98 peer_id: PeerId,
100
101 public_key: Vec<u8>,
103
104 shutdown: Arc<AtomicBool>,
106
107 pending_data: Arc<RwLock<BoundedPendingBuffer>>,
109}
110
111impl std::fmt::Debug for P2pEndpoint {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("P2pEndpoint")
114 .field("peer_id", &self.peer_id)
115 .field("config", &self.config)
116 .finish_non_exhaustive()
117 }
118}
119
120#[derive(Debug, Clone)]
122pub struct PeerConnection {
123 pub peer_id: PeerId,
125
126 pub remote_addr: SocketAddr,
128
129 pub authenticated: bool,
131
132 pub connected_at: Instant,
134
135 pub last_activity: Instant,
137}
138
139#[derive(Debug, Clone, Default)]
141pub struct ConnectionMetrics {
142 pub bytes_sent: u64,
144
145 pub bytes_received: u64,
147
148 pub rtt: Option<Duration>,
150
151 pub packet_loss: f64,
153
154 pub last_activity: Option<Instant>,
156}
157
158#[derive(Debug, Clone)]
160pub struct EndpointStats {
161 pub active_connections: usize,
163
164 pub successful_connections: u64,
166
167 pub failed_connections: u64,
169
170 pub nat_traversal_attempts: u64,
172
173 pub nat_traversal_successes: u64,
175
176 pub direct_connections: u64,
178
179 pub relayed_connections: u64,
181
182 pub total_bootstrap_nodes: usize,
184
185 pub connected_bootstrap_nodes: usize,
187
188 pub start_time: Instant,
190
191 pub average_coordination_time: Duration,
193}
194
195impl Default for EndpointStats {
196 fn default() -> Self {
197 Self {
198 active_connections: 0,
199 successful_connections: 0,
200 failed_connections: 0,
201 nat_traversal_attempts: 0,
202 nat_traversal_successes: 0,
203 direct_connections: 0,
204 relayed_connections: 0,
205 total_bootstrap_nodes: 0,
206 connected_bootstrap_nodes: 0,
207 start_time: Instant::now(),
208 average_coordination_time: Duration::ZERO,
209 }
210 }
211}
212
213#[derive(Debug, Clone)]
215pub enum P2pEvent {
216 PeerConnected {
218 peer_id: PeerId,
220 addr: SocketAddr,
222 },
223
224 PeerDisconnected {
226 peer_id: PeerId,
228 reason: DisconnectReason,
230 },
231
232 NatTraversalProgress {
234 peer_id: PeerId,
236 phase: TraversalPhase,
238 },
239
240 ExternalAddressDiscovered {
242 addr: SocketAddr,
244 },
245
246 BootstrapStatus {
248 connected: usize,
250 total: usize,
252 },
253
254 PeerAuthenticated {
256 peer_id: PeerId,
258 },
259
260 DataReceived {
262 peer_id: PeerId,
264 bytes: usize,
266 },
267}
268
269#[derive(Debug, Clone)]
271pub enum DisconnectReason {
272 Normal,
274 Timeout,
276 ProtocolError(String),
278 AuthenticationFailed,
280 ConnectionLost,
282 RemoteClosed,
284}
285
286#[derive(Debug, thiserror::Error)]
290pub enum EndpointError {
291 #[error("Configuration error: {0}")]
293 Config(String),
294
295 #[error("Connection error: {0}")]
297 Connection(String),
298
299 #[error("NAT traversal error: {0}")]
301 NatTraversal(#[from] NatTraversalError),
302
303 #[error("Authentication error: {0}")]
305 Authentication(String),
306
307 #[error("Operation timed out")]
309 Timeout,
310
311 #[error("Peer not found: {0:?}")]
313 PeerNotFound(PeerId),
314
315 #[error("Already connected to peer: {0:?}")]
317 AlreadyConnected(PeerId),
318
319 #[error("Endpoint is shutting down")]
321 ShuttingDown,
322}
323
324impl P2pEndpoint {
325 pub async fn new(config: P2pConfig) -> Result<Self, EndpointError> {
327 let (public_key, secret_key) = match config.keypair.clone() {
329 Some(keypair) => keypair,
330 None => generate_ml_dsa_keypair().map_err(|e| {
331 EndpointError::Config(format!("Failed to generate ML-DSA-65 keypair: {e:?}"))
332 })?,
333 };
334 let peer_id = derive_peer_id_from_public_key(&public_key);
335
336 info!("Creating P2P endpoint with peer ID: {:?}", peer_id);
337
338 let public_key_bytes: Vec<u8> = public_key.as_bytes().to_vec();
341
342 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
344 let event_tx_clone = event_tx.clone();
345
346 let stats = Arc::new(RwLock::new(EndpointStats {
348 total_bootstrap_nodes: config.known_peers.len(),
349 start_time: Instant::now(),
350 ..Default::default()
351 }));
352 let stats_clone = Arc::clone(&stats);
353
354 let event_callback = Box::new(move |event: NatTraversalEvent| {
356 let event_tx = event_tx_clone.clone();
357 let stats = stats_clone.clone();
358
359 tokio::spawn(async move {
360 let mut stats_guard = stats.write().await;
362 match &event {
363 NatTraversalEvent::CoordinationRequested { .. } => {
364 stats_guard.nat_traversal_attempts += 1;
365 }
366 NatTraversalEvent::ConnectionEstablished {
367 peer_id,
368 remote_address,
369 } => {
370 stats_guard.nat_traversal_successes += 1;
371 stats_guard.active_connections += 1;
372 stats_guard.successful_connections += 1;
373
374 let _ = event_tx.send(P2pEvent::PeerConnected {
376 peer_id: *peer_id,
377 addr: *remote_address,
378 });
379 }
380 NatTraversalEvent::TraversalFailed { peer_id, .. } => {
381 stats_guard.failed_connections += 1;
382 let _ = event_tx.send(P2pEvent::NatTraversalProgress {
383 peer_id: *peer_id,
384 phase: TraversalPhase::Failed,
385 });
386 }
387 NatTraversalEvent::PhaseTransition {
388 peer_id, to_phase, ..
389 } => {
390 let _ = event_tx.send(P2pEvent::NatTraversalProgress {
391 peer_id: *peer_id,
392 phase: *to_phase,
393 });
394 }
395 NatTraversalEvent::ExternalAddressDiscovered { address, .. } => {
396 info!("External address discovered: {}", address);
397 let _ =
398 event_tx.send(P2pEvent::ExternalAddressDiscovered { addr: *address });
399 }
400 _ => {}
401 }
402 drop(stats_guard);
403 });
404 });
405
406 let nat_config = config.to_nat_config_with_key(public_key.clone(), secret_key);
409 let inner = NatTraversalEndpoint::new(nat_config, Some(event_callback))
410 .await
411 .map_err(|e| EndpointError::Config(e.to_string()))?;
412
413 Ok(Self {
414 inner: Arc::new(inner),
415 connected_peers: Arc::new(RwLock::new(HashMap::new())),
417 stats,
418 config,
419 event_tx,
420 peer_id,
421 public_key: public_key_bytes,
422 shutdown: Arc::new(AtomicBool::new(false)),
423 pending_data: Arc::new(RwLock::new(BoundedPendingBuffer::default())),
424 })
425 }
426
427 pub fn peer_id(&self) -> PeerId {
429 self.peer_id
430 }
431
432 pub fn get_quic_connection(
436 &self,
437 peer_id: &PeerId,
438 ) -> Result<Option<crate::high_level::Connection>, EndpointError> {
439 self.inner
440 .get_connection(peer_id)
441 .map_err(EndpointError::NatTraversal)
442 }
443
444 pub fn local_addr(&self) -> Option<SocketAddr> {
446 self.inner
447 .get_endpoint()
448 .and_then(|ep| ep.local_addr().ok())
449 }
450
451 pub fn external_addr(&self) -> Option<SocketAddr> {
453 self.inner.get_observed_external_address().ok().flatten()
454 }
455
456 pub fn public_key_bytes(&self) -> &[u8] {
458 &self.public_key
459 }
460
461 pub async fn connect(&self, addr: SocketAddr) -> Result<PeerConnection, EndpointError> {
465 if self.shutdown.load(Ordering::SeqCst) {
466 return Err(EndpointError::ShuttingDown);
467 }
468
469 info!("Connecting directly to {}", addr);
470
471 let endpoint = self
472 .inner
473 .get_endpoint()
474 .ok_or_else(|| EndpointError::Config("QUIC endpoint not available".to_string()))?;
475
476 let connecting = endpoint
477 .connect(addr, "peer")
478 .map_err(|e| EndpointError::Connection(e.to_string()))?;
479
480 let connection = connecting
481 .await
482 .map_err(|e| EndpointError::Connection(e.to_string()))?;
483
484 let peer_id = self
486 .inner
487 .extract_peer_id_from_connection(&connection)
488 .await
489 .unwrap_or_else(|| self.derive_peer_id_from_address(addr));
490
491 self.inner
493 .add_connection(peer_id, connection.clone())
494 .map_err(EndpointError::NatTraversal)?;
495
496 self.inner
498 .spawn_connection_handler(peer_id, connection)
499 .map_err(EndpointError::NatTraversal)?;
500
501 let peer_conn = PeerConnection {
504 peer_id,
505 remote_addr: addr,
506 authenticated: true, connected_at: Instant::now(),
508 last_activity: Instant::now(),
509 };
510
511 self.connected_peers
513 .write()
514 .await
515 .insert(peer_id, peer_conn.clone());
516
517 {
519 let mut stats = self.stats.write().await;
520 stats.active_connections += 1;
521 stats.successful_connections += 1;
522 stats.direct_connections += 1;
523 }
524
525 let _ = self
527 .event_tx
528 .send(P2pEvent::PeerConnected { peer_id, addr });
529
530 Ok(peer_conn)
531 }
532
533 pub async fn connect_to_peer(
535 &self,
536 peer_id: PeerId,
537 coordinator: Option<SocketAddr>,
538 ) -> Result<PeerConnection, EndpointError> {
539 if self.shutdown.load(Ordering::SeqCst) {
540 return Err(EndpointError::ShuttingDown);
541 }
542
543 let coord_addr = coordinator
544 .or_else(|| self.config.known_peers.first().copied())
545 .ok_or_else(|| EndpointError::Config("No coordinator available".to_string()))?;
546
547 info!(
548 "Initiating NAT traversal to peer {:?} via coordinator {}",
549 peer_id, coord_addr
550 );
551
552 let _ = self.event_tx.send(P2pEvent::NatTraversalProgress {
554 peer_id,
555 phase: TraversalPhase::Discovery,
556 });
557
558 self.inner
560 .initiate_nat_traversal(peer_id, coord_addr)
561 .map_err(EndpointError::NatTraversal)?;
562
563 let start = Instant::now();
565 let timeout = self
566 .config
567 .timeouts
568 .nat_traversal
569 .connection_establishment_timeout;
570
571 while start.elapsed() < timeout {
572 if self.shutdown.load(Ordering::SeqCst) {
573 return Err(EndpointError::ShuttingDown);
574 }
575
576 let events = self
577 .inner
578 .poll(Instant::now())
579 .map_err(EndpointError::NatTraversal)?;
580
581 for event in events {
582 match event {
583 NatTraversalEvent::ConnectionEstablished {
584 peer_id: evt_peer,
585 remote_address,
586 } if evt_peer == peer_id => {
587 let peer_conn = PeerConnection {
589 peer_id,
590 remote_addr: remote_address,
591 authenticated: true, connected_at: Instant::now(),
593 last_activity: Instant::now(),
594 };
595
596 self.connected_peers
597 .write()
598 .await
599 .insert(peer_id, peer_conn.clone());
600
601 return Ok(peer_conn);
602 }
603 NatTraversalEvent::TraversalFailed {
604 peer_id: evt_peer,
605 error,
606 ..
607 } if evt_peer == peer_id => {
608 return Err(EndpointError::NatTraversal(error));
609 }
610 _ => {}
611 }
612 }
613
614 tokio::time::sleep(Duration::from_millis(50)).await;
615 }
616
617 Err(EndpointError::Timeout)
618 }
619
620 pub async fn accept(&self) -> Option<PeerConnection> {
622 if self.shutdown.load(Ordering::SeqCst) {
623 return None;
624 }
625
626 match self.inner.accept_connection().await {
627 Ok((peer_id, connection)) => {
628 let remote_addr = connection.remote_address();
629 let mut resolved_peer_id = peer_id;
630
631 if let Some(actual_peer_id) = self
632 .inner
633 .extract_peer_id_from_connection(&connection)
634 .await
635 {
636 if actual_peer_id != peer_id {
637 let _ = self.inner.remove_connection(&peer_id);
638 let _ = self
639 .inner
640 .add_connection(actual_peer_id, connection.clone());
641 resolved_peer_id = actual_peer_id;
642 }
643 }
644
645 if let Err(e) = self
646 .inner
647 .spawn_connection_handler(resolved_peer_id, connection)
648 {
649 error!("Failed to spawn connection handler: {}", e);
650 return None;
651 }
652
653 let peer_conn = PeerConnection {
655 peer_id: resolved_peer_id,
656 remote_addr,
657 authenticated: true, connected_at: Instant::now(),
659 last_activity: Instant::now(),
660 };
661
662 self.connected_peers
663 .write()
664 .await
665 .insert(resolved_peer_id, peer_conn.clone());
666
667 {
668 let mut stats = self.stats.write().await;
669 stats.active_connections += 1;
670 stats.successful_connections += 1;
671 }
672
673 let _ = self.event_tx.send(P2pEvent::PeerConnected {
674 peer_id: resolved_peer_id,
675 addr: remote_addr,
676 });
677
678 Some(peer_conn)
679 }
680 Err(e) => {
681 debug!("Accept failed: {}", e);
682 None
683 }
684 }
685 }
686
687 pub async fn disconnect(&self, peer_id: &PeerId) -> Result<(), EndpointError> {
689 if let Some(peer_conn) = self.connected_peers.write().await.remove(peer_id) {
690 let _ = self.inner.remove_connection(peer_id);
691
692 {
693 let mut stats = self.stats.write().await;
694 stats.active_connections = stats.active_connections.saturating_sub(1);
695 }
696
697 let _ = self.event_tx.send(P2pEvent::PeerDisconnected {
698 peer_id: *peer_id,
699 reason: DisconnectReason::Normal,
700 });
701
702 info!(
703 "Disconnected from peer {:?} at {}",
704 peer_id, peer_conn.remote_addr
705 );
706 Ok(())
707 } else {
708 Err(EndpointError::PeerNotFound(*peer_id))
709 }
710 }
711
712 pub async fn send(&self, peer_id: &PeerId, data: &[u8]) -> Result<(), EndpointError> {
716 if self.shutdown.load(Ordering::SeqCst) {
717 return Err(EndpointError::ShuttingDown);
718 }
719
720 let connection = self
721 .inner
722 .get_connection(peer_id)
723 .map_err(EndpointError::NatTraversal)?
724 .ok_or(EndpointError::PeerNotFound(*peer_id))?;
725
726 let mut send_stream = connection
727 .open_uni()
728 .await
729 .map_err(|e| EndpointError::Connection(e.to_string()))?;
730
731 send_stream
732 .write_all(data)
733 .await
734 .map_err(|e| EndpointError::Connection(e.to_string()))?;
735
736 send_stream
737 .finish()
738 .map_err(|e| EndpointError::Connection(e.to_string()))?;
739
740 if let Some(peer_conn) = self.connected_peers.write().await.get_mut(peer_id) {
742 peer_conn.last_activity = Instant::now();
743 }
744
745 debug!("Sent {} bytes to peer {:?}", data.len(), peer_id);
746 Ok(())
747 }
748
749 pub async fn recv(&self, timeout: Duration) -> Result<(PeerId, Vec<u8>), EndpointError> {
755 if self.shutdown.load(Ordering::SeqCst) {
756 return Err(EndpointError::ShuttingDown);
757 }
758
759 {
761 let mut pending = self.pending_data.write().await;
762 pending.cleanup_expired();
764
765 if let Some((peer_id, data)) = pending.pop_any() {
766 if let Some(peer_conn) = self.connected_peers.write().await.get_mut(&peer_id) {
767 peer_conn.last_activity = Instant::now();
768 }
769 let _ = self.event_tx.send(P2pEvent::DataReceived {
770 peer_id,
771 bytes: data.len(),
772 });
773 return Ok((peer_id, data));
774 }
775 }
776
777 let peers = self.connected_peers.read().await.clone();
778
779 if peers.is_empty() {
780 return Err(EndpointError::Connection("No connected peers".to_string()));
781 }
782
783 let start = Instant::now();
784 let peer_count = peers.len().max(1);
785
786 while start.elapsed() < timeout {
787 let remaining = timeout.saturating_sub(start.elapsed());
789 if remaining.is_zero() {
790 break;
791 }
792
793 let per_peer_timeout = remaining
795 .checked_div(peer_count as u32)
796 .unwrap_or(Duration::from_millis(5))
797 .max(Duration::from_millis(5));
798
799 for (peer_id, _) in peers.iter() {
800 if start.elapsed() >= timeout {
802 break;
803 }
804
805 if let Ok(Some(connection)) = self.inner.get_connection(peer_id) {
806 if let Ok(Ok(mut recv_stream)) =
808 tokio::time::timeout(per_peer_timeout, connection.accept_uni()).await
809 {
810 if let Ok(data) = recv_stream.read_to_end(1024 * 1024).await {
811 if !data.is_empty() {
812 if let Some(peer_conn) =
813 self.connected_peers.write().await.get_mut(peer_id)
814 {
815 peer_conn.last_activity = Instant::now();
816 }
817
818 let _ = self.event_tx.send(P2pEvent::DataReceived {
819 peer_id: *peer_id,
820 bytes: data.len(),
821 });
822 return Ok((*peer_id, data));
823 }
824 }
825 }
826 }
827 }
828
829 if start.elapsed() < timeout {
831 tokio::time::sleep(Duration::from_millis(5)).await;
832 }
833 }
834
835 Err(EndpointError::Timeout)
836 }
837
838 pub fn subscribe(&self) -> broadcast::Receiver<P2pEvent> {
842 self.event_tx.subscribe()
843 }
844
845 pub async fn stats(&self) -> EndpointStats {
849 self.stats.read().await.clone()
850 }
851
852 pub async fn connection_metrics(&self, peer_id: &PeerId) -> Option<ConnectionMetrics> {
854 let connection = self.inner.get_connection(peer_id).ok()??;
855 let stats = connection.stats();
856 let rtt = connection.rtt();
857
858 let last_activity = self
859 .connected_peers
860 .read()
861 .await
862 .get(peer_id)
863 .map(|p| p.last_activity);
864
865 Some(ConnectionMetrics {
866 bytes_sent: stats.udp_tx.bytes,
867 bytes_received: stats.udp_rx.bytes,
868 rtt: Some(rtt),
869 packet_loss: stats.path.lost_packets as f64
870 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
871 last_activity,
872 })
873 }
874
875 pub fn nat_stats(&self) -> Result<NatTraversalStatistics, EndpointError> {
877 self.inner
878 .get_nat_stats()
879 .map_err(|e| EndpointError::Connection(e.to_string()))
880 }
881
882 pub async fn connect_known_peers(&self) -> Result<usize, EndpointError> {
886 let mut connected = 0;
887 let known_peers = self.config.known_peers.clone();
888
889 for addr in &known_peers {
890 match self.connect(*addr).await {
891 Ok(_) => {
892 connected += 1;
893 info!("Connected to known peer {}", addr);
894 }
895 Err(e) => {
896 warn!("Failed to connect to known peer {}: {}", addr, e);
897 }
898 }
899 }
900
901 {
902 let mut stats = self.stats.write().await;
903 stats.connected_bootstrap_nodes = connected;
904 }
905
906 let _ = self.event_tx.send(P2pEvent::BootstrapStatus {
907 connected,
908 total: known_peers.len(),
909 });
910
911 Ok(connected)
912 }
913
914 pub async fn add_bootstrap(&self, addr: SocketAddr) {
916 let _ = self.inner.add_bootstrap_node(addr);
917 let mut stats = self.stats.write().await;
918 stats.total_bootstrap_nodes += 1;
919 }
920
921 pub async fn connected_peers(&self) -> Vec<PeerConnection> {
923 self.connected_peers
924 .read()
925 .await
926 .values()
927 .cloned()
928 .collect()
929 }
930
931 pub async fn is_connected(&self, peer_id: &PeerId) -> bool {
933 self.connected_peers.read().await.contains_key(peer_id)
934 }
935
936 pub async fn is_authenticated(&self, peer_id: &PeerId) -> bool {
938 self.connected_peers
939 .read()
940 .await
941 .get(peer_id)
942 .map(|p| p.authenticated)
943 .unwrap_or(false)
944 }
945
946 pub async fn shutdown(&self) {
950 info!("Shutting down P2P endpoint");
951 self.shutdown.store(true, Ordering::SeqCst);
952
953 let peers: Vec<PeerId> = self.connected_peers.read().await.keys().copied().collect();
955 for peer_id in peers {
956 let _ = self.disconnect(&peer_id).await;
957 }
958
959 let _ = self.inner.shutdown().await;
960 }
961
962 pub fn is_running(&self) -> bool {
964 !self.shutdown.load(Ordering::SeqCst)
965 }
966
967 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
970 use std::collections::hash_map::DefaultHasher;
971 use std::hash::{Hash, Hasher};
972
973 let mut hasher = DefaultHasher::new();
974 addr.hash(&mut hasher);
975 let hash = hasher.finish();
976
977 let mut peer_id_bytes = [0u8; 32];
978 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
979 peer_id_bytes[8..10].copy_from_slice(&addr.port().to_le_bytes());
980
981 PeerId(peer_id_bytes)
982 }
983
984 }
986
987impl Clone for P2pEndpoint {
988 fn clone(&self) -> Self {
989 Self {
990 inner: Arc::clone(&self.inner),
991 connected_peers: Arc::clone(&self.connected_peers),
993 stats: Arc::clone(&self.stats),
994 config: self.config.clone(),
995 event_tx: self.event_tx.clone(),
996 peer_id: self.peer_id,
997 public_key: self.public_key.clone(),
998 shutdown: Arc::clone(&self.shutdown),
999 pending_data: Arc::clone(&self.pending_data),
1000 }
1001 }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use super::*;
1007
1008 #[test]
1009 fn test_endpoint_stats_default() {
1010 let stats = EndpointStats::default();
1011 assert_eq!(stats.active_connections, 0);
1012 assert_eq!(stats.successful_connections, 0);
1013 assert_eq!(stats.nat_traversal_attempts, 0);
1014 }
1015
1016 #[test]
1017 fn test_connection_metrics_default() {
1018 let metrics = ConnectionMetrics::default();
1019 assert_eq!(metrics.bytes_sent, 0);
1020 assert_eq!(metrics.bytes_received, 0);
1021 assert!(metrics.rtt.is_none());
1022 assert_eq!(metrics.packet_loss, 0.0);
1023 }
1024
1025 #[test]
1026 fn test_peer_connection_debug() {
1027 let conn = PeerConnection {
1028 peer_id: PeerId([0u8; 32]),
1029 remote_addr: "127.0.0.1:8080".parse().expect("valid addr"),
1030 authenticated: false,
1031 connected_at: Instant::now(),
1032 last_activity: Instant::now(),
1033 };
1034 let debug_str = format!("{:?}", conn);
1035 assert!(debug_str.contains("PeerConnection"));
1036 }
1037
1038 #[test]
1039 fn test_disconnect_reason_debug() {
1040 let reason = DisconnectReason::Normal;
1041 assert!(format!("{:?}", reason).contains("Normal"));
1042
1043 let reason = DisconnectReason::ProtocolError("test".to_string());
1044 assert!(format!("{:?}", reason).contains("test"));
1045 }
1046
1047 #[test]
1048 fn test_traversal_phase_debug() {
1049 let phase = TraversalPhase::Discovery;
1050 assert!(format!("{:?}", phase).contains("Discovery"));
1051 }
1052
1053 #[test]
1054 fn test_endpoint_error_display() {
1055 let err = EndpointError::Timeout;
1056 assert!(err.to_string().contains("timed out"));
1057
1058 let err = EndpointError::PeerNotFound(PeerId([0u8; 32]));
1059 assert!(err.to_string().contains("not found"));
1060 }
1061
1062 #[cfg(feature = "runtime-tokio")]
1063 #[tokio::test]
1064 async fn test_endpoint_creation() {
1065 let config = P2pConfig::builder().build().expect("valid config");
1067
1068 let result = P2pEndpoint::new(config).await;
1069 if let Ok(endpoint) = result {
1071 assert!(endpoint.is_running());
1072 assert!(endpoint.local_addr().is_some() || endpoint.local_addr().is_none());
1073 }
1074 }
1075}