ant_quic/
p2p_endpoint.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! P2P endpoint for ant-quic
9//!
10//! This module provides the main API for P2P communication with NAT traversal,
11//! secure connections, and event-driven architecture.
12//!
13//! # Features
14//!
15//! - Configuration via [`P2pConfig`](crate::unified_config::P2pConfig)
16//! - Event subscription via broadcast channels
17//! - Built-in authentication support
18//! - NAT traversal with automatic fallback
19//! - Connection metrics and statistics
20//!
21//! # Example
22//!
23//! ```rust,ignore
24//! use ant_quic::{P2pEndpoint, P2pConfig};
25//!
26//! #[tokio::main]
27//! async fn main() -> anyhow::Result<()> {
28//!     // All nodes are symmetric - they can both connect and accept connections
29//!     let config = P2pConfig::builder()
30//!         .bind_addr("0.0.0.0:9000".parse()?)
31//!         .known_peer("quic.saorsalabs.com:9000".parse()?)
32//!         .build()?;
33//!
34//!     let endpoint = P2pEndpoint::new(config).await?;
35//!     println!("Peer ID: {:?}", endpoint.peer_id());
36//!
37//!     // Subscribe to events
38//!     let mut events = endpoint.subscribe();
39//!     tokio::spawn(async move {
40//!         while let Ok(event) = events.recv().await {
41//!             println!("Event: {:?}", event);
42//!         }
43//!     });
44//!
45//!     // Connect to known peers
46//!     endpoint.connect_known_peers().await?;
47//!
48//!     Ok(())
49//! }
50//! ```
51
52use std::collections::{HashMap, VecDeque};
53use std::net::SocketAddr;
54use std::sync::Arc;
55use std::sync::atomic::{AtomicBool, Ordering};
56use std::time::{Duration, Instant};
57
58use tokio::sync::{RwLock, broadcast};
59use tracing::{debug, error, info, warn};
60
61use crate::auth::{AuthManager, AuthMessage, AuthProtocol};
62use crate::crypto::raw_public_keys::key_utils::{
63    derive_peer_id_from_public_key, generate_ed25519_keypair,
64};
65use crate::nat_traversal_api::{
66    NatTraversalEndpoint, NatTraversalError, NatTraversalEvent, NatTraversalStatistics, PeerId,
67};
68
69// Re-export TraversalPhase from nat_traversal_api for convenience
70pub use crate::nat_traversal_api::TraversalPhase;
71use crate::unified_config::P2pConfig;
72
73/// Event channel capacity
74const EVENT_CHANNEL_CAPACITY: usize = 256;
75
76/// P2P endpoint - the primary API for ant-quic
77///
78/// This struct provides the main interface for P2P communication with
79/// NAT traversal, connection management, and secure messaging.
80pub struct P2pEndpoint {
81    /// Internal NAT traversal endpoint
82    inner: Arc<NatTraversalEndpoint>,
83
84    /// Authentication manager
85    auth_manager: Arc<AuthManager>,
86
87    /// Connected peers with their addresses
88    connected_peers: Arc<RwLock<HashMap<PeerId, PeerConnection>>>,
89
90    /// Endpoint statistics
91    stats: Arc<RwLock<EndpointStats>>,
92
93    /// Configuration
94    config: P2pConfig,
95
96    /// Event broadcaster
97    event_tx: broadcast::Sender<P2pEvent>,
98
99    /// Our peer ID
100    peer_id: PeerId,
101
102    /// Shutdown flag
103    shutdown: Arc<AtomicBool>,
104
105    /// Pending data buffer for data received from non-target peers during authentication
106    /// This prevents data loss when authenticate_peer receives data from other peers
107    pending_data: Arc<RwLock<HashMap<PeerId, VecDeque<Vec<u8>>>>>,
108}
109
110impl std::fmt::Debug for P2pEndpoint {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct("P2pEndpoint")
113            .field("peer_id", &self.peer_id)
114            .field("config", &self.config)
115            .finish_non_exhaustive()
116    }
117}
118
119/// Connection information for a peer
120#[derive(Debug, Clone)]
121pub struct PeerConnection {
122    /// Remote peer's ID
123    pub peer_id: PeerId,
124
125    /// Remote address
126    pub remote_addr: SocketAddr,
127
128    /// Whether peer is authenticated
129    pub authenticated: bool,
130
131    /// Connection established time
132    pub connected_at: Instant,
133
134    /// Last activity time
135    pub last_activity: Instant,
136}
137
138/// Connection metrics for P2P peers
139#[derive(Debug, Clone, Default)]
140pub struct ConnectionMetrics {
141    /// Bytes sent to this peer
142    pub bytes_sent: u64,
143
144    /// Bytes received from this peer
145    pub bytes_received: u64,
146
147    /// Round-trip time
148    pub rtt: Option<Duration>,
149
150    /// Packet loss rate (0.0 to 1.0)
151    pub packet_loss: f64,
152
153    /// Last activity timestamp
154    pub last_activity: Option<Instant>,
155}
156
157/// P2P endpoint statistics
158#[derive(Debug, Clone)]
159pub struct EndpointStats {
160    /// Number of active connections
161    pub active_connections: usize,
162
163    /// Total successful connections
164    pub successful_connections: u64,
165
166    /// Total failed connections
167    pub failed_connections: u64,
168
169    /// NAT traversal attempts
170    pub nat_traversal_attempts: u64,
171
172    /// Successful NAT traversals
173    pub nat_traversal_successes: u64,
174
175    /// Direct connections (no NAT traversal needed)
176    pub direct_connections: u64,
177
178    /// Relayed connections
179    pub relayed_connections: u64,
180
181    /// Total bootstrap nodes configured
182    pub total_bootstrap_nodes: usize,
183
184    /// Connected bootstrap nodes
185    pub connected_bootstrap_nodes: usize,
186
187    /// Endpoint start time
188    pub start_time: Instant,
189
190    /// Average coordination time for NAT traversal
191    pub average_coordination_time: Duration,
192}
193
194impl Default for EndpointStats {
195    fn default() -> Self {
196        Self {
197            active_connections: 0,
198            successful_connections: 0,
199            failed_connections: 0,
200            nat_traversal_attempts: 0,
201            nat_traversal_successes: 0,
202            direct_connections: 0,
203            relayed_connections: 0,
204            total_bootstrap_nodes: 0,
205            connected_bootstrap_nodes: 0,
206            start_time: Instant::now(),
207            average_coordination_time: Duration::ZERO,
208        }
209    }
210}
211
212/// P2P event for connection and network state changes
213#[derive(Debug, Clone)]
214pub enum P2pEvent {
215    /// New peer connected
216    PeerConnected {
217        /// Peer's ID
218        peer_id: PeerId,
219        /// Remote address
220        addr: SocketAddr,
221    },
222
223    /// Peer disconnected
224    PeerDisconnected {
225        /// Peer's ID
226        peer_id: PeerId,
227        /// Reason for disconnection
228        reason: DisconnectReason,
229    },
230
231    /// NAT traversal progress
232    NatTraversalProgress {
233        /// Target peer ID
234        peer_id: PeerId,
235        /// Current phase
236        phase: TraversalPhase,
237    },
238
239    /// External address discovered
240    ExternalAddressDiscovered {
241        /// Discovered external address
242        addr: SocketAddr,
243    },
244
245    /// Bootstrap connection status
246    BootstrapStatus {
247        /// Number of connected bootstrap nodes
248        connected: usize,
249        /// Total number of bootstrap nodes
250        total: usize,
251    },
252
253    /// Peer authenticated
254    PeerAuthenticated {
255        /// Authenticated peer ID
256        peer_id: PeerId,
257    },
258
259    /// Data received from peer
260    DataReceived {
261        /// Source peer ID
262        peer_id: PeerId,
263        /// Number of bytes received
264        bytes: usize,
265    },
266}
267
268/// Reason for peer disconnection
269#[derive(Debug, Clone)]
270pub enum DisconnectReason {
271    /// Normal disconnect
272    Normal,
273    /// Connection timeout
274    Timeout,
275    /// Protocol error
276    ProtocolError(String),
277    /// Authentication failure
278    AuthenticationFailed,
279    /// Connection lost
280    ConnectionLost,
281    /// Remote closed
282    RemoteClosed,
283}
284
285// TraversalPhase is re-exported from nat_traversal_api
286
287/// Error type for P2pEndpoint operations
288#[derive(Debug, thiserror::Error)]
289pub enum EndpointError {
290    /// Configuration error
291    #[error("Configuration error: {0}")]
292    Config(String),
293
294    /// Connection error
295    #[error("Connection error: {0}")]
296    Connection(String),
297
298    /// NAT traversal error
299    #[error("NAT traversal error: {0}")]
300    NatTraversal(#[from] NatTraversalError),
301
302    /// Authentication error
303    #[error("Authentication error: {0}")]
304    Authentication(String),
305
306    /// Timeout error
307    #[error("Operation timed out")]
308    Timeout,
309
310    /// Peer not found
311    #[error("Peer not found: {0:?}")]
312    PeerNotFound(PeerId),
313
314    /// Already connected
315    #[error("Already connected to peer: {0:?}")]
316    AlreadyConnected(PeerId),
317
318    /// Shutdown in progress
319    #[error("Endpoint is shutting down")]
320    ShuttingDown,
321}
322
323impl P2pEndpoint {
324    /// Create a new P2P endpoint with the given configuration
325    pub async fn new(config: P2pConfig) -> Result<Self, EndpointError> {
326        // Generate Ed25519 keypair for authentication
327        let (secret_key, public_key) = generate_ed25519_keypair();
328        let peer_id = derive_peer_id_from_public_key(&public_key);
329
330        info!("Creating P2P endpoint with peer ID: {:?}", peer_id);
331
332        // Create authentication manager (clone secret_key since we need it for NAT config too)
333        let auth_manager = Arc::new(AuthManager::new(secret_key.clone(), config.auth.clone()));
334
335        // Create event channel
336        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
337        let event_tx_clone = event_tx.clone();
338
339        // Create stats
340        let stats = Arc::new(RwLock::new(EndpointStats {
341            total_bootstrap_nodes: config.known_peers.len(),
342            start_time: Instant::now(),
343            ..Default::default()
344        }));
345        let stats_clone = Arc::clone(&stats);
346
347        // Create event callback that bridges to broadcast channel
348        let event_callback = Box::new(move |event: NatTraversalEvent| {
349            let event_tx = event_tx_clone.clone();
350            let stats = stats_clone.clone();
351
352            tokio::spawn(async move {
353                // Update stats based on event
354                let mut stats_guard = stats.write().await;
355                match &event {
356                    NatTraversalEvent::CoordinationRequested { .. } => {
357                        stats_guard.nat_traversal_attempts += 1;
358                    }
359                    NatTraversalEvent::ConnectionEstablished {
360                        peer_id,
361                        remote_address,
362                    } => {
363                        stats_guard.nat_traversal_successes += 1;
364                        stats_guard.active_connections += 1;
365                        stats_guard.successful_connections += 1;
366
367                        // Broadcast event
368                        let _ = event_tx.send(P2pEvent::PeerConnected {
369                            peer_id: *peer_id,
370                            addr: *remote_address,
371                        });
372                    }
373                    NatTraversalEvent::TraversalFailed { peer_id, .. } => {
374                        stats_guard.failed_connections += 1;
375                        let _ = event_tx.send(P2pEvent::NatTraversalProgress {
376                            peer_id: *peer_id,
377                            phase: TraversalPhase::Failed,
378                        });
379                    }
380                    NatTraversalEvent::PhaseTransition {
381                        peer_id, to_phase, ..
382                    } => {
383                        let _ = event_tx.send(P2pEvent::NatTraversalProgress {
384                            peer_id: *peer_id,
385                            phase: *to_phase,
386                        });
387                    }
388                    NatTraversalEvent::ExternalAddressDiscovered { address, .. } => {
389                        info!("External address discovered: {}", address);
390                        let _ =
391                            event_tx.send(P2pEvent::ExternalAddressDiscovered { addr: *address });
392                    }
393                    _ => {}
394                }
395                drop(stats_guard);
396            });
397        });
398
399        // Create NAT traversal endpoint with the same identity key used for auth
400        // This ensures P2pEndpoint and NatTraversalEndpoint use the same keypair
401        let nat_config = config.to_nat_config_with_key(secret_key);
402        let inner = NatTraversalEndpoint::new(nat_config, Some(event_callback))
403            .await
404            .map_err(|e| EndpointError::Config(e.to_string()))?;
405
406        Ok(Self {
407            inner: Arc::new(inner),
408            auth_manager,
409            connected_peers: Arc::new(RwLock::new(HashMap::new())),
410            stats,
411            config,
412            event_tx,
413            peer_id,
414            shutdown: Arc::new(AtomicBool::new(false)),
415            pending_data: Arc::new(RwLock::new(HashMap::new())),
416        })
417    }
418
419    /// Get the local peer ID
420    pub fn peer_id(&self) -> PeerId {
421        self.peer_id
422    }
423
424    /// Get the local bind address
425    pub fn local_addr(&self) -> Option<SocketAddr> {
426        self.inner
427            .get_endpoint()
428            .and_then(|ep| ep.local_addr().ok())
429    }
430
431    /// Get observed external address (if discovered)
432    pub fn external_addr(&self) -> Option<SocketAddr> {
433        self.inner.get_observed_external_address().ok().flatten()
434    }
435
436    /// Get the public key bytes
437    pub fn public_key_bytes(&self) -> [u8; 32] {
438        self.auth_manager.public_key_bytes()
439    }
440
441    // === Connection Management ===
442
443    /// Connect to a peer by address (direct connection)
444    pub async fn connect(&self, addr: SocketAddr) -> Result<PeerConnection, EndpointError> {
445        if self.shutdown.load(Ordering::SeqCst) {
446            return Err(EndpointError::ShuttingDown);
447        }
448
449        info!("Connecting directly to {}", addr);
450
451        let endpoint = self
452            .inner
453            .get_endpoint()
454            .ok_or_else(|| EndpointError::Config("QUIC endpoint not available".to_string()))?;
455
456        let connecting = endpoint
457            .connect(addr, "peer")
458            .map_err(|e| EndpointError::Connection(e.to_string()))?;
459
460        let connection = connecting
461            .await
462            .map_err(|e| EndpointError::Connection(e.to_string()))?;
463
464        // Prefer peer ID derived from the authenticated public key.
465        let peer_id = self
466            .inner
467            .extract_peer_id_from_connection(&connection)
468            .await
469            .unwrap_or_else(|| self.derive_peer_id_from_address(addr));
470
471        // Store connection
472        self.inner
473            .add_connection(peer_id, connection.clone())
474            .map_err(EndpointError::NatTraversal)?;
475
476        // Spawn handler
477        self.inner
478            .spawn_connection_handler(peer_id, connection)
479            .map_err(EndpointError::NatTraversal)?;
480
481        // Create peer connection record
482        let peer_conn = PeerConnection {
483            peer_id,
484            remote_addr: addr,
485            authenticated: false,
486            connected_at: Instant::now(),
487            last_activity: Instant::now(),
488        };
489
490        // Store peer
491        self.connected_peers
492            .write()
493            .await
494            .insert(peer_id, peer_conn.clone());
495
496        // Handle authentication if required
497        if self.config.auth.require_authentication {
498            if let Err(err) = self.authenticate_peer(&peer_id).await {
499                let _ = self.inner.remove_connection(&peer_id);
500                let _ = self.connected_peers.write().await.remove(&peer_id);
501                return Err(err);
502            }
503        }
504
505        // Update stats
506        {
507            let mut stats = self.stats.write().await;
508            stats.active_connections += 1;
509            stats.successful_connections += 1;
510            stats.direct_connections += 1;
511        }
512
513        // Broadcast event
514        let _ = self
515            .event_tx
516            .send(P2pEvent::PeerConnected { peer_id, addr });
517
518        Ok(peer_conn)
519    }
520
521    /// Connect to a peer by ID using NAT traversal
522    pub async fn connect_to_peer(
523        &self,
524        peer_id: PeerId,
525        coordinator: Option<SocketAddr>,
526    ) -> Result<PeerConnection, EndpointError> {
527        if self.shutdown.load(Ordering::SeqCst) {
528            return Err(EndpointError::ShuttingDown);
529        }
530
531        let coord_addr = coordinator
532            .or_else(|| self.config.known_peers.first().copied())
533            .ok_or_else(|| EndpointError::Config("No coordinator available".to_string()))?;
534
535        info!(
536            "Initiating NAT traversal to peer {:?} via coordinator {}",
537            peer_id, coord_addr
538        );
539
540        // Broadcast progress
541        let _ = self.event_tx.send(P2pEvent::NatTraversalProgress {
542            peer_id,
543            phase: TraversalPhase::Discovery,
544        });
545
546        // Initiate NAT traversal
547        self.inner
548            .initiate_nat_traversal(peer_id, coord_addr)
549            .map_err(EndpointError::NatTraversal)?;
550
551        // Poll for completion
552        let start = Instant::now();
553        let timeout = self
554            .config
555            .timeouts
556            .nat_traversal
557            .connection_establishment_timeout;
558
559        while start.elapsed() < timeout {
560            if self.shutdown.load(Ordering::SeqCst) {
561                return Err(EndpointError::ShuttingDown);
562            }
563
564            let events = self
565                .inner
566                .poll(Instant::now())
567                .map_err(EndpointError::NatTraversal)?;
568
569            for event in events {
570                match event {
571                    NatTraversalEvent::ConnectionEstablished {
572                        peer_id: evt_peer,
573                        remote_address,
574                    } if evt_peer == peer_id => {
575                        let peer_conn = PeerConnection {
576                            peer_id,
577                            remote_addr: remote_address,
578                            authenticated: false,
579                            connected_at: Instant::now(),
580                            last_activity: Instant::now(),
581                        };
582
583                        self.connected_peers
584                            .write()
585                            .await
586                            .insert(peer_id, peer_conn.clone());
587
588                        // Handle authentication if required
589                        if self.config.auth.require_authentication {
590                            self.authenticate_peer(&peer_id).await?;
591                        }
592
593                        return Ok(peer_conn);
594                    }
595                    NatTraversalEvent::TraversalFailed {
596                        peer_id: evt_peer,
597                        error,
598                        ..
599                    } if evt_peer == peer_id => {
600                        return Err(EndpointError::NatTraversal(error));
601                    }
602                    _ => {}
603                }
604            }
605
606            tokio::time::sleep(Duration::from_millis(50)).await;
607        }
608
609        Err(EndpointError::Timeout)
610    }
611
612    /// Accept incoming connections
613    pub async fn accept(&self) -> Option<PeerConnection> {
614        if self.shutdown.load(Ordering::SeqCst) {
615            return None;
616        }
617
618        match self.inner.accept_connection().await {
619            Ok((peer_id, connection)) => {
620                let remote_addr = connection.remote_address();
621                let mut resolved_peer_id = peer_id;
622
623                if let Some(actual_peer_id) = self
624                    .inner
625                    .extract_peer_id_from_connection(&connection)
626                    .await
627                {
628                    if actual_peer_id != peer_id {
629                        let _ = self.inner.remove_connection(&peer_id);
630                        let _ = self
631                            .inner
632                            .add_connection(actual_peer_id, connection.clone());
633                        resolved_peer_id = actual_peer_id;
634                    }
635                }
636
637                if let Err(e) = self
638                    .inner
639                    .spawn_connection_handler(resolved_peer_id, connection)
640                {
641                    error!("Failed to spawn connection handler: {}", e);
642                    return None;
643                }
644
645                let peer_conn = PeerConnection {
646                    peer_id: resolved_peer_id,
647                    remote_addr,
648                    authenticated: false,
649                    connected_at: Instant::now(),
650                    last_activity: Instant::now(),
651                };
652
653                self.connected_peers
654                    .write()
655                    .await
656                    .insert(resolved_peer_id, peer_conn.clone());
657
658                if self.config.auth.require_authentication {
659                    if let Err(err) = self.authenticate_peer(&resolved_peer_id).await {
660                        let _ = self.inner.remove_connection(&resolved_peer_id);
661                        let _ = self.connected_peers.write().await.remove(&resolved_peer_id);
662                        warn!(
663                            "Authentication failed for peer {:?}: {}",
664                            resolved_peer_id, err
665                        );
666                        return None;
667                    }
668                }
669
670                {
671                    let mut stats = self.stats.write().await;
672                    stats.active_connections += 1;
673                    stats.successful_connections += 1;
674                }
675
676                let _ = self.event_tx.send(P2pEvent::PeerConnected {
677                    peer_id: resolved_peer_id,
678                    addr: remote_addr,
679                });
680
681                Some(peer_conn)
682            }
683            Err(e) => {
684                debug!("Accept failed: {}", e);
685                None
686            }
687        }
688    }
689
690    /// Disconnect from a peer
691    pub async fn disconnect(&self, peer_id: &PeerId) -> Result<(), EndpointError> {
692        if let Some(peer_conn) = self.connected_peers.write().await.remove(peer_id) {
693            let _ = self.inner.remove_connection(peer_id);
694
695            {
696                let mut stats = self.stats.write().await;
697                stats.active_connections = stats.active_connections.saturating_sub(1);
698            }
699
700            let _ = self.event_tx.send(P2pEvent::PeerDisconnected {
701                peer_id: *peer_id,
702                reason: DisconnectReason::Normal,
703            });
704
705            info!(
706                "Disconnected from peer {:?} at {}",
707                peer_id, peer_conn.remote_addr
708            );
709            Ok(())
710        } else {
711            Err(EndpointError::PeerNotFound(*peer_id))
712        }
713    }
714
715    // === Messaging ===
716
717    /// Send data to a peer
718    pub async fn send(&self, peer_id: &PeerId, data: &[u8]) -> Result<(), EndpointError> {
719        if self.shutdown.load(Ordering::SeqCst) {
720            return Err(EndpointError::ShuttingDown);
721        }
722
723        let connection = self
724            .inner
725            .get_connection(peer_id)
726            .map_err(EndpointError::NatTraversal)?
727            .ok_or(EndpointError::PeerNotFound(*peer_id))?;
728
729        let mut send_stream = connection
730            .open_uni()
731            .await
732            .map_err(|e| EndpointError::Connection(e.to_string()))?;
733
734        send_stream
735            .write_all(data)
736            .await
737            .map_err(|e| EndpointError::Connection(e.to_string()))?;
738
739        send_stream
740            .finish()
741            .map_err(|e| EndpointError::Connection(e.to_string()))?;
742
743        // Update last activity
744        if let Some(peer_conn) = self.connected_peers.write().await.get_mut(peer_id) {
745            peer_conn.last_activity = Instant::now();
746        }
747
748        debug!("Sent {} bytes to peer {:?}", data.len(), peer_id);
749        Ok(())
750    }
751
752    /// Receive data from any peer (with timeout)
753    ///
754    /// This function first checks the pending data buffer for data that was
755    /// buffered during authentication, then polls streams from connected peers.
756    /// The timeout is properly distributed across all peers to avoid O(n*timeout) delays.
757    pub async fn recv(&self, timeout: Duration) -> Result<(PeerId, Vec<u8>), EndpointError> {
758        if self.shutdown.load(Ordering::SeqCst) {
759            return Err(EndpointError::ShuttingDown);
760        }
761
762        // First, check pending data buffer (data buffered during authentication)
763        {
764            let mut pending = self.pending_data.write().await;
765            for (peer_id, queue) in pending.iter_mut() {
766                if let Some(data) = queue.pop_front() {
767                    if let Some(peer_conn) = self.connected_peers.write().await.get_mut(peer_id) {
768                        peer_conn.last_activity = Instant::now();
769                    }
770                    let _ = self.event_tx.send(P2pEvent::DataReceived {
771                        peer_id: *peer_id,
772                        bytes: data.len(),
773                    });
774                    return Ok((*peer_id, data));
775                }
776            }
777            // Clean up empty queues
778            pending.retain(|_, queue| !queue.is_empty());
779        }
780
781        let peers = self.connected_peers.read().await.clone();
782
783        if peers.is_empty() {
784            return Err(EndpointError::Connection("No connected peers".to_string()));
785        }
786
787        let start = Instant::now();
788        let peer_count = peers.len().max(1);
789
790        while start.elapsed() < timeout {
791            // Calculate per-peer timeout based on remaining time
792            let remaining = timeout.saturating_sub(start.elapsed());
793            if remaining.is_zero() {
794                break;
795            }
796
797            // Distribute remaining time across peers with minimum 5ms per peer
798            let per_peer_timeout = remaining
799                .checked_div(peer_count as u32)
800                .unwrap_or(Duration::from_millis(5))
801                .max(Duration::from_millis(5));
802
803            for (peer_id, _) in peers.iter() {
804                // Check if we've exceeded total timeout
805                if start.elapsed() >= timeout {
806                    break;
807                }
808
809                if let Ok(Some(connection)) = self.inner.get_connection(peer_id) {
810                    // Try unidirectional stream with calculated per-peer timeout
811                    if let Ok(Ok(mut recv_stream)) =
812                        tokio::time::timeout(per_peer_timeout, connection.accept_uni()).await
813                    {
814                        if let Ok(data) = recv_stream.read_to_end(1024 * 1024).await {
815                            if !data.is_empty() {
816                                if let Some(peer_conn) =
817                                    self.connected_peers.write().await.get_mut(peer_id)
818                                {
819                                    peer_conn.last_activity = Instant::now();
820                                }
821
822                                let _ = self.event_tx.send(P2pEvent::DataReceived {
823                                    peer_id: *peer_id,
824                                    bytes: data.len(),
825                                });
826                                return Ok((*peer_id, data));
827                            }
828                        }
829                    }
830                }
831            }
832
833            // Short sleep between iterations, but only if we have time left
834            if start.elapsed() < timeout {
835                tokio::time::sleep(Duration::from_millis(5)).await;
836            }
837        }
838
839        Err(EndpointError::Timeout)
840    }
841
842    // === Events ===
843
844    /// Subscribe to endpoint events
845    pub fn subscribe(&self) -> broadcast::Receiver<P2pEvent> {
846        self.event_tx.subscribe()
847    }
848
849    // === Statistics ===
850
851    /// Get endpoint statistics
852    pub async fn stats(&self) -> EndpointStats {
853        self.stats.read().await.clone()
854    }
855
856    /// Get metrics for a specific connection
857    pub async fn connection_metrics(&self, peer_id: &PeerId) -> Option<ConnectionMetrics> {
858        let connection = self.inner.get_connection(peer_id).ok()??;
859        let stats = connection.stats();
860        let rtt = connection.rtt();
861
862        let last_activity = self
863            .connected_peers
864            .read()
865            .await
866            .get(peer_id)
867            .map(|p| p.last_activity);
868
869        Some(ConnectionMetrics {
870            bytes_sent: stats.udp_tx.bytes,
871            bytes_received: stats.udp_rx.bytes,
872            rtt: Some(rtt),
873            packet_loss: stats.path.lost_packets as f64
874                / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
875            last_activity,
876        })
877    }
878
879    /// Get NAT traversal statistics
880    pub fn nat_stats(&self) -> Result<NatTraversalStatistics, EndpointError> {
881        self.inner
882            .get_nat_stats()
883            .map_err(|e| EndpointError::Connection(e.to_string()))
884    }
885
886    // === Known Peers ===
887
888    /// Connect to configured known peers
889    pub async fn connect_known_peers(&self) -> Result<usize, EndpointError> {
890        let mut connected = 0;
891        let known_peers = self.config.known_peers.clone();
892
893        for addr in &known_peers {
894            match self.connect(*addr).await {
895                Ok(_) => {
896                    connected += 1;
897                    info!("Connected to known peer {}", addr);
898                }
899                Err(e) => {
900                    warn!("Failed to connect to known peer {}: {}", addr, e);
901                }
902            }
903        }
904
905        {
906            let mut stats = self.stats.write().await;
907            stats.connected_bootstrap_nodes = connected;
908        }
909
910        let _ = self.event_tx.send(P2pEvent::BootstrapStatus {
911            connected,
912            total: known_peers.len(),
913        });
914
915        Ok(connected)
916    }
917
918    /// Add a bootstrap node dynamically
919    pub async fn add_bootstrap(&self, addr: SocketAddr) {
920        let _ = self.inner.add_bootstrap_node(addr);
921        let mut stats = self.stats.write().await;
922        stats.total_bootstrap_nodes += 1;
923    }
924
925    /// Get list of connected peers
926    pub async fn connected_peers(&self) -> Vec<PeerConnection> {
927        self.connected_peers
928            .read()
929            .await
930            .values()
931            .cloned()
932            .collect()
933    }
934
935    /// Check if a peer is connected
936    pub async fn is_connected(&self, peer_id: &PeerId) -> bool {
937        self.connected_peers.read().await.contains_key(peer_id)
938    }
939
940    /// Check if a peer is authenticated
941    pub async fn is_authenticated(&self, peer_id: &PeerId) -> bool {
942        self.connected_peers
943            .read()
944            .await
945            .get(peer_id)
946            .map(|p| p.authenticated)
947            .unwrap_or(false)
948    }
949
950    // === Lifecycle ===
951
952    /// Shutdown the endpoint gracefully
953    pub async fn shutdown(&self) {
954        info!("Shutting down P2P endpoint");
955        self.shutdown.store(true, Ordering::SeqCst);
956
957        // Disconnect all peers
958        let peers: Vec<PeerId> = self.connected_peers.read().await.keys().copied().collect();
959        for peer_id in peers {
960            let _ = self.disconnect(&peer_id).await;
961        }
962
963        let _ = self.inner.shutdown().await;
964    }
965
966    /// Check if endpoint is running
967    pub fn is_running(&self) -> bool {
968        !self.shutdown.load(Ordering::SeqCst)
969    }
970
971    // === Internal helpers ===
972
973    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
974        use std::collections::hash_map::DefaultHasher;
975        use std::hash::{Hash, Hasher};
976
977        let mut hasher = DefaultHasher::new();
978        addr.hash(&mut hasher);
979        let hash = hasher.finish();
980
981        let mut peer_id_bytes = [0u8; 32];
982        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
983        peer_id_bytes[8..10].copy_from_slice(&addr.port().to_le_bytes());
984
985        PeerId(peer_id_bytes)
986    }
987
988    async fn authenticate_peer(&self, peer_id: &PeerId) -> Result<(), EndpointError> {
989        info!("Authenticating peer {:?}", peer_id);
990
991        let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
992        let auth_request = auth_protocol.initiate_auth().await;
993        let data = AuthManager::serialize_message(&auth_request)
994            .map_err(|e| EndpointError::Authentication(e.to_string()))?;
995
996        self.send(peer_id, &data).await?;
997
998        let timeout = self.config.auth.auth_timeout;
999        let start = Instant::now();
1000
1001        while start.elapsed() < timeout {
1002            let remaining = timeout.saturating_sub(start.elapsed());
1003
1004            match self.recv(remaining).await {
1005                Ok((recv_peer_id, data)) if recv_peer_id == *peer_id => {
1006                    let message = AuthManager::deserialize_message(&data)
1007                        .map_err(|e| EndpointError::Authentication(e.to_string()))?;
1008
1009                    if let AuthMessage::AuthFailure { reason } = &message {
1010                        return Err(EndpointError::Authentication(reason.clone()));
1011                    }
1012
1013                    let response = auth_protocol
1014                        .handle_message(*peer_id, message.clone())
1015                        .await
1016                        .map_err(|e| EndpointError::Authentication(e.to_string()))?;
1017
1018                    if let Some(response) = response {
1019                        let response_data = AuthManager::serialize_message(&response)
1020                            .map_err(|e| EndpointError::Authentication(e.to_string()))?;
1021                        self.send(peer_id, &response_data).await?;
1022
1023                        if matches!(response, AuthMessage::AuthSuccess { .. }) {
1024                            if let Some(peer_conn) =
1025                                self.connected_peers.write().await.get_mut(peer_id)
1026                            {
1027                                peer_conn.authenticated = true;
1028                            }
1029                            let _ = self
1030                                .event_tx
1031                                .send(P2pEvent::PeerAuthenticated { peer_id: *peer_id });
1032                            return Ok(());
1033                        }
1034                    }
1035
1036                    if matches!(message, AuthMessage::AuthSuccess { .. }) {
1037                        if let Some(peer_conn) = self.connected_peers.write().await.get_mut(peer_id)
1038                        {
1039                            peer_conn.authenticated = true;
1040                        }
1041                        let _ = self
1042                            .event_tx
1043                            .send(P2pEvent::PeerAuthenticated { peer_id: *peer_id });
1044                        return Ok(());
1045                    }
1046                }
1047                Ok((other_peer_id, data)) => {
1048                    // Buffer data from non-target peers to prevent data loss
1049                    // This data will be retrieved on the next recv() call
1050                    debug!(
1051                        "Buffering {} bytes from peer {:?} during authentication",
1052                        data.len(),
1053                        other_peer_id
1054                    );
1055                    let mut pending = self.pending_data.write().await;
1056                    pending
1057                        .entry(other_peer_id)
1058                        .or_insert_with(VecDeque::new)
1059                        .push_back(data);
1060                    continue;
1061                }
1062                Err(EndpointError::Timeout) => break,
1063                Err(e) => {
1064                    return Err(EndpointError::Authentication(format!(
1065                        "Authentication failed: {e}"
1066                    )));
1067                }
1068            }
1069        }
1070
1071        Err(EndpointError::Authentication(
1072            "Authentication timeout".to_string(),
1073        ))
1074    }
1075}
1076
1077impl Clone for P2pEndpoint {
1078    fn clone(&self) -> Self {
1079        Self {
1080            inner: Arc::clone(&self.inner),
1081            auth_manager: Arc::clone(&self.auth_manager),
1082            connected_peers: Arc::clone(&self.connected_peers),
1083            stats: Arc::clone(&self.stats),
1084            config: self.config.clone(),
1085            event_tx: self.event_tx.clone(),
1086            peer_id: self.peer_id,
1087            shutdown: Arc::clone(&self.shutdown),
1088            pending_data: Arc::clone(&self.pending_data),
1089        }
1090    }
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096
1097    #[test]
1098    fn test_endpoint_stats_default() {
1099        let stats = EndpointStats::default();
1100        assert_eq!(stats.active_connections, 0);
1101        assert_eq!(stats.successful_connections, 0);
1102        assert_eq!(stats.nat_traversal_attempts, 0);
1103    }
1104
1105    #[test]
1106    fn test_connection_metrics_default() {
1107        let metrics = ConnectionMetrics::default();
1108        assert_eq!(metrics.bytes_sent, 0);
1109        assert_eq!(metrics.bytes_received, 0);
1110        assert!(metrics.rtt.is_none());
1111        assert_eq!(metrics.packet_loss, 0.0);
1112    }
1113
1114    #[test]
1115    fn test_peer_connection_debug() {
1116        let conn = PeerConnection {
1117            peer_id: PeerId([0u8; 32]),
1118            remote_addr: "127.0.0.1:8080".parse().expect("valid addr"),
1119            authenticated: false,
1120            connected_at: Instant::now(),
1121            last_activity: Instant::now(),
1122        };
1123        let debug_str = format!("{:?}", conn);
1124        assert!(debug_str.contains("PeerConnection"));
1125    }
1126
1127    #[test]
1128    fn test_disconnect_reason_debug() {
1129        let reason = DisconnectReason::Normal;
1130        assert!(format!("{:?}", reason).contains("Normal"));
1131
1132        let reason = DisconnectReason::ProtocolError("test".to_string());
1133        assert!(format!("{:?}", reason).contains("test"));
1134    }
1135
1136    #[test]
1137    fn test_traversal_phase_debug() {
1138        let phase = TraversalPhase::Discovery;
1139        assert!(format!("{:?}", phase).contains("Discovery"));
1140    }
1141
1142    #[test]
1143    fn test_endpoint_error_display() {
1144        let err = EndpointError::Timeout;
1145        assert!(err.to_string().contains("timed out"));
1146
1147        let err = EndpointError::PeerNotFound(PeerId([0u8; 32]));
1148        assert!(err.to_string().contains("not found"));
1149    }
1150
1151    #[tokio::test]
1152    async fn test_endpoint_creation() {
1153        // v0.13.0+: No role - all nodes are symmetric P2P nodes
1154        let config = P2pConfig::builder().build().expect("valid config");
1155
1156        let result = P2pEndpoint::new(config).await;
1157        // May fail in test environment without network, but shouldn't panic
1158        if let Ok(endpoint) = result {
1159            assert!(endpoint.is_running());
1160            assert!(endpoint.local_addr().is_some() || endpoint.local_addr().is_none());
1161        }
1162    }
1163}