ant_quic/
p2p_endpoint.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! P2P endpoint for ant-quic
9//!
10//! This module provides the main API for P2P communication with NAT traversal,
11//! secure connections, and event-driven architecture.
12//!
13//! # Features
14//!
15//! - Configuration via [`P2pConfig`](crate::unified_config::P2pConfig)
16//! - Event subscription via broadcast channels
17//! - TLS-based peer authentication via ML-DSA-65 (v0.2+)
18//! - NAT traversal with automatic fallback
19//! - Connection metrics and statistics
20//!
21//! # Example
22//!
23//! ```rust,ignore
24//! use ant_quic::{P2pEndpoint, P2pConfig};
25//!
26//! #[tokio::main]
27//! async fn main() -> anyhow::Result<()> {
28//!     // All nodes are symmetric - they can both connect and accept connections
29//!     let config = P2pConfig::builder()
30//!         .bind_addr("0.0.0.0:9000".parse()?)
31//!         .known_peer("quic.saorsalabs.com:9000".parse()?)
32//!         .build()?;
33//!
34//!     let endpoint = P2pEndpoint::new(config).await?;
35//!     println!("Peer ID: {:?}", endpoint.peer_id());
36//!
37//!     // Subscribe to events
38//!     let mut events = endpoint.subscribe();
39//!     tokio::spawn(async move {
40//!         while let Ok(event) = events.recv().await {
41//!             println!("Event: {:?}", event);
42//!         }
43//!     });
44//!
45//!     // Connect to known peers
46//!     endpoint.connect_known_peers().await?;
47//!
48//!     Ok(())
49//! }
50//! ```
51
52use std::collections::HashMap;
53use std::net::SocketAddr;
54use std::sync::Arc;
55use std::sync::atomic::{AtomicBool, Ordering};
56use std::time::{Duration, Instant};
57
58use tokio::sync::{RwLock, broadcast};
59use tracing::{debug, error, info, warn};
60
61// v0.2: auth module removed - TLS handles peer authentication via ML-DSA-65
62use crate::bounded_pending_buffer::BoundedPendingBuffer;
63use crate::crypto::raw_public_keys::key_utils::{
64    MlDsaPublicKey, MlDsaSecretKey, derive_peer_id_from_public_key, generate_ml_dsa_keypair,
65};
66use crate::nat_traversal_api::{
67    NatTraversalEndpoint, NatTraversalError, NatTraversalEvent, NatTraversalStatistics, PeerId,
68};
69
70// Re-export TraversalPhase from nat_traversal_api for convenience
71pub use crate::nat_traversal_api::TraversalPhase;
72use crate::unified_config::P2pConfig;
73
74/// Event channel capacity
75const EVENT_CHANNEL_CAPACITY: usize = 256;
76
77/// P2P endpoint - the primary API for ant-quic
78///
79/// This struct provides the main interface for P2P communication with
80/// NAT traversal, connection management, and secure messaging.
81pub struct P2pEndpoint {
82    /// Internal NAT traversal endpoint
83    inner: Arc<NatTraversalEndpoint>,
84
85    // v0.2: auth_manager removed - TLS handles peer authentication via ML-DSA-65
86    /// Connected peers with their addresses
87    connected_peers: Arc<RwLock<HashMap<PeerId, PeerConnection>>>,
88
89    /// Endpoint statistics
90    stats: Arc<RwLock<EndpointStats>>,
91
92    /// Configuration
93    config: P2pConfig,
94
95    /// Event broadcaster
96    event_tx: broadcast::Sender<P2pEvent>,
97
98    /// Our peer ID
99    peer_id: PeerId,
100
101    /// Our ML-DSA-65 public key bytes (for identity sharing) - 1952 bytes
102    public_key: Vec<u8>,
103
104    /// Shutdown flag
105    shutdown: Arc<AtomicBool>,
106
107    /// Bounded pending data buffer for message ordering
108    pending_data: Arc<RwLock<BoundedPendingBuffer>>,
109}
110
111impl std::fmt::Debug for P2pEndpoint {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("P2pEndpoint")
114            .field("peer_id", &self.peer_id)
115            .field("config", &self.config)
116            .finish_non_exhaustive()
117    }
118}
119
120/// Connection information for a peer
121#[derive(Debug, Clone)]
122pub struct PeerConnection {
123    /// Remote peer's ID
124    pub peer_id: PeerId,
125
126    /// Remote address
127    pub remote_addr: SocketAddr,
128
129    /// Whether peer is authenticated
130    pub authenticated: bool,
131
132    /// Connection established time
133    pub connected_at: Instant,
134
135    /// Last activity time
136    pub last_activity: Instant,
137}
138
139/// Connection metrics for P2P peers
140#[derive(Debug, Clone, Default)]
141pub struct ConnectionMetrics {
142    /// Bytes sent to this peer
143    pub bytes_sent: u64,
144
145    /// Bytes received from this peer
146    pub bytes_received: u64,
147
148    /// Round-trip time
149    pub rtt: Option<Duration>,
150
151    /// Packet loss rate (0.0 to 1.0)
152    pub packet_loss: f64,
153
154    /// Last activity timestamp
155    pub last_activity: Option<Instant>,
156}
157
158/// P2P endpoint statistics
159#[derive(Debug, Clone)]
160pub struct EndpointStats {
161    /// Number of active connections
162    pub active_connections: usize,
163
164    /// Total successful connections
165    pub successful_connections: u64,
166
167    /// Total failed connections
168    pub failed_connections: u64,
169
170    /// NAT traversal attempts
171    pub nat_traversal_attempts: u64,
172
173    /// Successful NAT traversals
174    pub nat_traversal_successes: u64,
175
176    /// Direct connections (no NAT traversal needed)
177    pub direct_connections: u64,
178
179    /// Relayed connections
180    pub relayed_connections: u64,
181
182    /// Total bootstrap nodes configured
183    pub total_bootstrap_nodes: usize,
184
185    /// Connected bootstrap nodes
186    pub connected_bootstrap_nodes: usize,
187
188    /// Endpoint start time
189    pub start_time: Instant,
190
191    /// Average coordination time for NAT traversal
192    pub average_coordination_time: Duration,
193}
194
195impl Default for EndpointStats {
196    fn default() -> Self {
197        Self {
198            active_connections: 0,
199            successful_connections: 0,
200            failed_connections: 0,
201            nat_traversal_attempts: 0,
202            nat_traversal_successes: 0,
203            direct_connections: 0,
204            relayed_connections: 0,
205            total_bootstrap_nodes: 0,
206            connected_bootstrap_nodes: 0,
207            start_time: Instant::now(),
208            average_coordination_time: Duration::ZERO,
209        }
210    }
211}
212
213/// P2P event for connection and network state changes
214#[derive(Debug, Clone)]
215pub enum P2pEvent {
216    /// New peer connected
217    PeerConnected {
218        /// Peer's ID
219        peer_id: PeerId,
220        /// Remote address
221        addr: SocketAddr,
222    },
223
224    /// Peer disconnected
225    PeerDisconnected {
226        /// Peer's ID
227        peer_id: PeerId,
228        /// Reason for disconnection
229        reason: DisconnectReason,
230    },
231
232    /// NAT traversal progress
233    NatTraversalProgress {
234        /// Target peer ID
235        peer_id: PeerId,
236        /// Current phase
237        phase: TraversalPhase,
238    },
239
240    /// External address discovered
241    ExternalAddressDiscovered {
242        /// Discovered external address
243        addr: SocketAddr,
244    },
245
246    /// Bootstrap connection status
247    BootstrapStatus {
248        /// Number of connected bootstrap nodes
249        connected: usize,
250        /// Total number of bootstrap nodes
251        total: usize,
252    },
253
254    /// Peer authenticated
255    PeerAuthenticated {
256        /// Authenticated peer ID
257        peer_id: PeerId,
258    },
259
260    /// Data received from peer
261    DataReceived {
262        /// Source peer ID
263        peer_id: PeerId,
264        /// Number of bytes received
265        bytes: usize,
266    },
267}
268
269/// Reason for peer disconnection
270#[derive(Debug, Clone)]
271pub enum DisconnectReason {
272    /// Normal disconnect
273    Normal,
274    /// Connection timeout
275    Timeout,
276    /// Protocol error
277    ProtocolError(String),
278    /// Authentication failure
279    AuthenticationFailed,
280    /// Connection lost
281    ConnectionLost,
282    /// Remote closed
283    RemoteClosed,
284}
285
286// TraversalPhase is re-exported from nat_traversal_api
287
288/// Error type for P2pEndpoint operations
289#[derive(Debug, thiserror::Error)]
290pub enum EndpointError {
291    /// Configuration error
292    #[error("Configuration error: {0}")]
293    Config(String),
294
295    /// Connection error
296    #[error("Connection error: {0}")]
297    Connection(String),
298
299    /// NAT traversal error
300    #[error("NAT traversal error: {0}")]
301    NatTraversal(#[from] NatTraversalError),
302
303    /// Authentication error
304    #[error("Authentication error: {0}")]
305    Authentication(String),
306
307    /// Timeout error
308    #[error("Operation timed out")]
309    Timeout,
310
311    /// Peer not found
312    #[error("Peer not found: {0:?}")]
313    PeerNotFound(PeerId),
314
315    /// Already connected
316    #[error("Already connected to peer: {0:?}")]
317    AlreadyConnected(PeerId),
318
319    /// Shutdown in progress
320    #[error("Endpoint is shutting down")]
321    ShuttingDown,
322}
323
324impl P2pEndpoint {
325    /// Create a new P2P endpoint with the given configuration
326    pub async fn new(config: P2pConfig) -> Result<Self, EndpointError> {
327        // Use provided keypair or generate a new one (ML-DSA-65)
328        let (public_key, secret_key) = match config.keypair.clone() {
329            Some(keypair) => keypair,
330            None => generate_ml_dsa_keypair().map_err(|e| {
331                EndpointError::Config(format!("Failed to generate ML-DSA-65 keypair: {e:?}"))
332            })?,
333        };
334        let peer_id = derive_peer_id_from_public_key(&public_key);
335
336        info!("Creating P2P endpoint with peer ID: {:?}", peer_id);
337
338        // v0.2: auth_manager removed - TLS handles peer authentication via ML-DSA-65
339        // Store public key bytes directly for identity sharing
340        let public_key_bytes: Vec<u8> = public_key.as_bytes().to_vec();
341
342        // Create event channel
343        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
344        let event_tx_clone = event_tx.clone();
345
346        // Create stats
347        let stats = Arc::new(RwLock::new(EndpointStats {
348            total_bootstrap_nodes: config.known_peers.len(),
349            start_time: Instant::now(),
350            ..Default::default()
351        }));
352        let stats_clone = Arc::clone(&stats);
353
354        // Create event callback that bridges to broadcast channel
355        let event_callback = Box::new(move |event: NatTraversalEvent| {
356            let event_tx = event_tx_clone.clone();
357            let stats = stats_clone.clone();
358
359            tokio::spawn(async move {
360                // Update stats based on event
361                let mut stats_guard = stats.write().await;
362                match &event {
363                    NatTraversalEvent::CoordinationRequested { .. } => {
364                        stats_guard.nat_traversal_attempts += 1;
365                    }
366                    NatTraversalEvent::ConnectionEstablished {
367                        peer_id,
368                        remote_address,
369                    } => {
370                        stats_guard.nat_traversal_successes += 1;
371                        stats_guard.active_connections += 1;
372                        stats_guard.successful_connections += 1;
373
374                        // Broadcast event
375                        let _ = event_tx.send(P2pEvent::PeerConnected {
376                            peer_id: *peer_id,
377                            addr: *remote_address,
378                        });
379                    }
380                    NatTraversalEvent::TraversalFailed { peer_id, .. } => {
381                        stats_guard.failed_connections += 1;
382                        let _ = event_tx.send(P2pEvent::NatTraversalProgress {
383                            peer_id: *peer_id,
384                            phase: TraversalPhase::Failed,
385                        });
386                    }
387                    NatTraversalEvent::PhaseTransition {
388                        peer_id, to_phase, ..
389                    } => {
390                        let _ = event_tx.send(P2pEvent::NatTraversalProgress {
391                            peer_id: *peer_id,
392                            phase: *to_phase,
393                        });
394                    }
395                    NatTraversalEvent::ExternalAddressDiscovered { address, .. } => {
396                        info!("External address discovered: {}", address);
397                        let _ =
398                            event_tx.send(P2pEvent::ExternalAddressDiscovered { addr: *address });
399                    }
400                    _ => {}
401                }
402                drop(stats_guard);
403            });
404        });
405
406        // Create NAT traversal endpoint with the same identity key used for auth
407        // This ensures P2pEndpoint and NatTraversalEndpoint use the same keypair
408        let nat_config = config.to_nat_config_with_key(public_key.clone(), secret_key);
409        let inner = NatTraversalEndpoint::new(nat_config, Some(event_callback))
410            .await
411            .map_err(|e| EndpointError::Config(e.to_string()))?;
412
413        Ok(Self {
414            inner: Arc::new(inner),
415            // v0.2: auth_manager removed
416            connected_peers: Arc::new(RwLock::new(HashMap::new())),
417            stats,
418            config,
419            event_tx,
420            peer_id,
421            public_key: public_key_bytes,
422            shutdown: Arc::new(AtomicBool::new(false)),
423            pending_data: Arc::new(RwLock::new(BoundedPendingBuffer::default())),
424        })
425    }
426
427    /// Get the local peer ID
428    pub fn peer_id(&self) -> PeerId {
429        self.peer_id
430    }
431
432    /// Get the underlying QUIC connection for a peer.
433    ///
434    /// This is used by the LinkTransport abstraction layer to wrap connections.
435    pub fn get_quic_connection(
436        &self,
437        peer_id: &PeerId,
438    ) -> Result<Option<crate::high_level::Connection>, EndpointError> {
439        self.inner
440            .get_connection(peer_id)
441            .map_err(EndpointError::NatTraversal)
442    }
443
444    /// Get the local bind address
445    pub fn local_addr(&self) -> Option<SocketAddr> {
446        self.inner
447            .get_endpoint()
448            .and_then(|ep| ep.local_addr().ok())
449    }
450
451    /// Get observed external address (if discovered)
452    pub fn external_addr(&self) -> Option<SocketAddr> {
453        self.inner.get_observed_external_address().ok().flatten()
454    }
455
456    /// Get the ML-DSA-65 public key bytes (1952 bytes)
457    pub fn public_key_bytes(&self) -> &[u8] {
458        &self.public_key
459    }
460
461    // === Connection Management ===
462
463    /// Connect to a peer by address (direct connection)
464    pub async fn connect(&self, addr: SocketAddr) -> Result<PeerConnection, EndpointError> {
465        if self.shutdown.load(Ordering::SeqCst) {
466            return Err(EndpointError::ShuttingDown);
467        }
468
469        info!("Connecting directly to {}", addr);
470
471        let endpoint = self
472            .inner
473            .get_endpoint()
474            .ok_or_else(|| EndpointError::Config("QUIC endpoint not available".to_string()))?;
475
476        let connecting = endpoint
477            .connect(addr, "peer")
478            .map_err(|e| EndpointError::Connection(e.to_string()))?;
479
480        let connection = connecting
481            .await
482            .map_err(|e| EndpointError::Connection(e.to_string()))?;
483
484        // Prefer peer ID derived from the authenticated public key.
485        let peer_id = self
486            .inner
487            .extract_peer_id_from_connection(&connection)
488            .await
489            .unwrap_or_else(|| self.derive_peer_id_from_address(addr));
490
491        // Store connection
492        self.inner
493            .add_connection(peer_id, connection.clone())
494            .map_err(EndpointError::NatTraversal)?;
495
496        // Spawn handler
497        self.inner
498            .spawn_connection_handler(peer_id, connection)
499            .map_err(EndpointError::NatTraversal)?;
500
501        // Create peer connection record
502        // v0.2: Peer is authenticated via TLS (ML-DSA-65) during handshake
503        let peer_conn = PeerConnection {
504            peer_id,
505            remote_addr: addr,
506            authenticated: true, // TLS handles authentication
507            connected_at: Instant::now(),
508            last_activity: Instant::now(),
509        };
510
511        // Store peer
512        self.connected_peers
513            .write()
514            .await
515            .insert(peer_id, peer_conn.clone());
516
517        // Update stats
518        {
519            let mut stats = self.stats.write().await;
520            stats.active_connections += 1;
521            stats.successful_connections += 1;
522            stats.direct_connections += 1;
523        }
524
525        // Broadcast event
526        let _ = self
527            .event_tx
528            .send(P2pEvent::PeerConnected { peer_id, addr });
529
530        Ok(peer_conn)
531    }
532
533    /// Connect to a peer by ID using NAT traversal
534    pub async fn connect_to_peer(
535        &self,
536        peer_id: PeerId,
537        coordinator: Option<SocketAddr>,
538    ) -> Result<PeerConnection, EndpointError> {
539        if self.shutdown.load(Ordering::SeqCst) {
540            return Err(EndpointError::ShuttingDown);
541        }
542
543        let coord_addr = coordinator
544            .or_else(|| self.config.known_peers.first().copied())
545            .ok_or_else(|| EndpointError::Config("No coordinator available".to_string()))?;
546
547        info!(
548            "Initiating NAT traversal to peer {:?} via coordinator {}",
549            peer_id, coord_addr
550        );
551
552        // Broadcast progress
553        let _ = self.event_tx.send(P2pEvent::NatTraversalProgress {
554            peer_id,
555            phase: TraversalPhase::Discovery,
556        });
557
558        // Initiate NAT traversal
559        self.inner
560            .initiate_nat_traversal(peer_id, coord_addr)
561            .map_err(EndpointError::NatTraversal)?;
562
563        // Poll for completion
564        let start = Instant::now();
565        let timeout = self
566            .config
567            .timeouts
568            .nat_traversal
569            .connection_establishment_timeout;
570
571        while start.elapsed() < timeout {
572            if self.shutdown.load(Ordering::SeqCst) {
573                return Err(EndpointError::ShuttingDown);
574            }
575
576            let events = self
577                .inner
578                .poll(Instant::now())
579                .map_err(EndpointError::NatTraversal)?;
580
581            for event in events {
582                match event {
583                    NatTraversalEvent::ConnectionEstablished {
584                        peer_id: evt_peer,
585                        remote_address,
586                    } if evt_peer == peer_id => {
587                        // v0.2: Peer is authenticated via TLS (ML-DSA-65) during handshake
588                        let peer_conn = PeerConnection {
589                            peer_id,
590                            remote_addr: remote_address,
591                            authenticated: true, // TLS handles authentication
592                            connected_at: Instant::now(),
593                            last_activity: Instant::now(),
594                        };
595
596                        self.connected_peers
597                            .write()
598                            .await
599                            .insert(peer_id, peer_conn.clone());
600
601                        return Ok(peer_conn);
602                    }
603                    NatTraversalEvent::TraversalFailed {
604                        peer_id: evt_peer,
605                        error,
606                        ..
607                    } if evt_peer == peer_id => {
608                        return Err(EndpointError::NatTraversal(error));
609                    }
610                    _ => {}
611                }
612            }
613
614            tokio::time::sleep(Duration::from_millis(50)).await;
615        }
616
617        Err(EndpointError::Timeout)
618    }
619
620    /// Accept incoming connections
621    pub async fn accept(&self) -> Option<PeerConnection> {
622        if self.shutdown.load(Ordering::SeqCst) {
623            return None;
624        }
625
626        match self.inner.accept_connection().await {
627            Ok((peer_id, connection)) => {
628                let remote_addr = connection.remote_address();
629                let mut resolved_peer_id = peer_id;
630
631                if let Some(actual_peer_id) = self
632                    .inner
633                    .extract_peer_id_from_connection(&connection)
634                    .await
635                {
636                    if actual_peer_id != peer_id {
637                        let _ = self.inner.remove_connection(&peer_id);
638                        let _ = self
639                            .inner
640                            .add_connection(actual_peer_id, connection.clone());
641                        resolved_peer_id = actual_peer_id;
642                    }
643                }
644
645                if let Err(e) = self
646                    .inner
647                    .spawn_connection_handler(resolved_peer_id, connection)
648                {
649                    error!("Failed to spawn connection handler: {}", e);
650                    return None;
651                }
652
653                // v0.2: Peer is authenticated via TLS (ML-DSA-65) during handshake
654                let peer_conn = PeerConnection {
655                    peer_id: resolved_peer_id,
656                    remote_addr,
657                    authenticated: true, // TLS handles authentication
658                    connected_at: Instant::now(),
659                    last_activity: Instant::now(),
660                };
661
662                self.connected_peers
663                    .write()
664                    .await
665                    .insert(resolved_peer_id, peer_conn.clone());
666
667                {
668                    let mut stats = self.stats.write().await;
669                    stats.active_connections += 1;
670                    stats.successful_connections += 1;
671                }
672
673                let _ = self.event_tx.send(P2pEvent::PeerConnected {
674                    peer_id: resolved_peer_id,
675                    addr: remote_addr,
676                });
677
678                Some(peer_conn)
679            }
680            Err(e) => {
681                debug!("Accept failed: {}", e);
682                None
683            }
684        }
685    }
686
687    /// Disconnect from a peer
688    pub async fn disconnect(&self, peer_id: &PeerId) -> Result<(), EndpointError> {
689        if let Some(peer_conn) = self.connected_peers.write().await.remove(peer_id) {
690            let _ = self.inner.remove_connection(peer_id);
691
692            {
693                let mut stats = self.stats.write().await;
694                stats.active_connections = stats.active_connections.saturating_sub(1);
695            }
696
697            let _ = self.event_tx.send(P2pEvent::PeerDisconnected {
698                peer_id: *peer_id,
699                reason: DisconnectReason::Normal,
700            });
701
702            info!(
703                "Disconnected from peer {:?} at {}",
704                peer_id, peer_conn.remote_addr
705            );
706            Ok(())
707        } else {
708            Err(EndpointError::PeerNotFound(*peer_id))
709        }
710    }
711
712    // === Messaging ===
713
714    /// Send data to a peer
715    pub async fn send(&self, peer_id: &PeerId, data: &[u8]) -> Result<(), EndpointError> {
716        if self.shutdown.load(Ordering::SeqCst) {
717            return Err(EndpointError::ShuttingDown);
718        }
719
720        let connection = self
721            .inner
722            .get_connection(peer_id)
723            .map_err(EndpointError::NatTraversal)?
724            .ok_or(EndpointError::PeerNotFound(*peer_id))?;
725
726        let mut send_stream = connection
727            .open_uni()
728            .await
729            .map_err(|e| EndpointError::Connection(e.to_string()))?;
730
731        send_stream
732            .write_all(data)
733            .await
734            .map_err(|e| EndpointError::Connection(e.to_string()))?;
735
736        send_stream
737            .finish()
738            .map_err(|e| EndpointError::Connection(e.to_string()))?;
739
740        // Update last activity
741        if let Some(peer_conn) = self.connected_peers.write().await.get_mut(peer_id) {
742            peer_conn.last_activity = Instant::now();
743        }
744
745        debug!("Sent {} bytes to peer {:?}", data.len(), peer_id);
746        Ok(())
747    }
748
749    /// Receive data from any peer (with timeout)
750    ///
751    /// This function first checks the pending data buffer for data that was
752    /// buffered during authentication, then polls streams from connected peers.
753    /// The timeout is properly distributed across all peers to avoid O(n*timeout) delays.
754    pub async fn recv(&self, timeout: Duration) -> Result<(PeerId, Vec<u8>), EndpointError> {
755        if self.shutdown.load(Ordering::SeqCst) {
756            return Err(EndpointError::ShuttingDown);
757        }
758
759        // First, check pending data buffer (data buffered during authentication)
760        {
761            let mut pending = self.pending_data.write().await;
762            // Run periodic cleanup of expired entries
763            pending.cleanup_expired();
764
765            if let Some((peer_id, data)) = pending.pop_any() {
766                if let Some(peer_conn) = self.connected_peers.write().await.get_mut(&peer_id) {
767                    peer_conn.last_activity = Instant::now();
768                }
769                let _ = self.event_tx.send(P2pEvent::DataReceived {
770                    peer_id,
771                    bytes: data.len(),
772                });
773                return Ok((peer_id, data));
774            }
775        }
776
777        let peers = self.connected_peers.read().await.clone();
778
779        if peers.is_empty() {
780            return Err(EndpointError::Connection("No connected peers".to_string()));
781        }
782
783        let start = Instant::now();
784        let peer_count = peers.len().max(1);
785
786        while start.elapsed() < timeout {
787            // Calculate per-peer timeout based on remaining time
788            let remaining = timeout.saturating_sub(start.elapsed());
789            if remaining.is_zero() {
790                break;
791            }
792
793            // Distribute remaining time across peers with minimum 5ms per peer
794            let per_peer_timeout = remaining
795                .checked_div(peer_count as u32)
796                .unwrap_or(Duration::from_millis(5))
797                .max(Duration::from_millis(5));
798
799            for (peer_id, _) in peers.iter() {
800                // Check if we've exceeded total timeout
801                if start.elapsed() >= timeout {
802                    break;
803                }
804
805                if let Ok(Some(connection)) = self.inner.get_connection(peer_id) {
806                    // Try unidirectional stream with calculated per-peer timeout
807                    if let Ok(Ok(mut recv_stream)) =
808                        tokio::time::timeout(per_peer_timeout, connection.accept_uni()).await
809                    {
810                        if let Ok(data) = recv_stream.read_to_end(1024 * 1024).await {
811                            if !data.is_empty() {
812                                if let Some(peer_conn) =
813                                    self.connected_peers.write().await.get_mut(peer_id)
814                                {
815                                    peer_conn.last_activity = Instant::now();
816                                }
817
818                                let _ = self.event_tx.send(P2pEvent::DataReceived {
819                                    peer_id: *peer_id,
820                                    bytes: data.len(),
821                                });
822                                return Ok((*peer_id, data));
823                            }
824                        }
825                    }
826                }
827            }
828
829            // Short sleep between iterations, but only if we have time left
830            if start.elapsed() < timeout {
831                tokio::time::sleep(Duration::from_millis(5)).await;
832            }
833        }
834
835        Err(EndpointError::Timeout)
836    }
837
838    // === Events ===
839
840    /// Subscribe to endpoint events
841    pub fn subscribe(&self) -> broadcast::Receiver<P2pEvent> {
842        self.event_tx.subscribe()
843    }
844
845    // === Statistics ===
846
847    /// Get endpoint statistics
848    pub async fn stats(&self) -> EndpointStats {
849        self.stats.read().await.clone()
850    }
851
852    /// Get metrics for a specific connection
853    pub async fn connection_metrics(&self, peer_id: &PeerId) -> Option<ConnectionMetrics> {
854        let connection = self.inner.get_connection(peer_id).ok()??;
855        let stats = connection.stats();
856        let rtt = connection.rtt();
857
858        let last_activity = self
859            .connected_peers
860            .read()
861            .await
862            .get(peer_id)
863            .map(|p| p.last_activity);
864
865        Some(ConnectionMetrics {
866            bytes_sent: stats.udp_tx.bytes,
867            bytes_received: stats.udp_rx.bytes,
868            rtt: Some(rtt),
869            packet_loss: stats.path.lost_packets as f64
870                / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
871            last_activity,
872        })
873    }
874
875    /// Get NAT traversal statistics
876    pub fn nat_stats(&self) -> Result<NatTraversalStatistics, EndpointError> {
877        self.inner
878            .get_nat_stats()
879            .map_err(|e| EndpointError::Connection(e.to_string()))
880    }
881
882    // === Known Peers ===
883
884    /// Connect to configured known peers
885    pub async fn connect_known_peers(&self) -> Result<usize, EndpointError> {
886        let mut connected = 0;
887        let known_peers = self.config.known_peers.clone();
888
889        for addr in &known_peers {
890            match self.connect(*addr).await {
891                Ok(_) => {
892                    connected += 1;
893                    info!("Connected to known peer {}", addr);
894                }
895                Err(e) => {
896                    warn!("Failed to connect to known peer {}: {}", addr, e);
897                }
898            }
899        }
900
901        {
902            let mut stats = self.stats.write().await;
903            stats.connected_bootstrap_nodes = connected;
904        }
905
906        let _ = self.event_tx.send(P2pEvent::BootstrapStatus {
907            connected,
908            total: known_peers.len(),
909        });
910
911        Ok(connected)
912    }
913
914    /// Add a bootstrap node dynamically
915    pub async fn add_bootstrap(&self, addr: SocketAddr) {
916        let _ = self.inner.add_bootstrap_node(addr);
917        let mut stats = self.stats.write().await;
918        stats.total_bootstrap_nodes += 1;
919    }
920
921    /// Get list of connected peers
922    pub async fn connected_peers(&self) -> Vec<PeerConnection> {
923        self.connected_peers
924            .read()
925            .await
926            .values()
927            .cloned()
928            .collect()
929    }
930
931    /// Check if a peer is connected
932    pub async fn is_connected(&self, peer_id: &PeerId) -> bool {
933        self.connected_peers.read().await.contains_key(peer_id)
934    }
935
936    /// Check if a peer is authenticated
937    pub async fn is_authenticated(&self, peer_id: &PeerId) -> bool {
938        self.connected_peers
939            .read()
940            .await
941            .get(peer_id)
942            .map(|p| p.authenticated)
943            .unwrap_or(false)
944    }
945
946    // === Lifecycle ===
947
948    /// Shutdown the endpoint gracefully
949    pub async fn shutdown(&self) {
950        info!("Shutting down P2P endpoint");
951        self.shutdown.store(true, Ordering::SeqCst);
952
953        // Disconnect all peers
954        let peers: Vec<PeerId> = self.connected_peers.read().await.keys().copied().collect();
955        for peer_id in peers {
956            let _ = self.disconnect(&peer_id).await;
957        }
958
959        let _ = self.inner.shutdown().await;
960    }
961
962    /// Check if endpoint is running
963    pub fn is_running(&self) -> bool {
964        !self.shutdown.load(Ordering::SeqCst)
965    }
966
967    // === Internal helpers ===
968
969    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
970        use std::collections::hash_map::DefaultHasher;
971        use std::hash::{Hash, Hasher};
972
973        let mut hasher = DefaultHasher::new();
974        addr.hash(&mut hasher);
975        let hash = hasher.finish();
976
977        let mut peer_id_bytes = [0u8; 32];
978        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
979        peer_id_bytes[8..10].copy_from_slice(&addr.port().to_le_bytes());
980
981        PeerId(peer_id_bytes)
982    }
983
984    // v0.2: authenticate_peer removed - TLS handles peer authentication via ML-DSA-65
985}
986
987impl Clone for P2pEndpoint {
988    fn clone(&self) -> Self {
989        Self {
990            inner: Arc::clone(&self.inner),
991            // v0.2: auth_manager removed - TLS handles peer authentication
992            connected_peers: Arc::clone(&self.connected_peers),
993            stats: Arc::clone(&self.stats),
994            config: self.config.clone(),
995            event_tx: self.event_tx.clone(),
996            peer_id: self.peer_id,
997            public_key: self.public_key.clone(),
998            shutdown: Arc::clone(&self.shutdown),
999            pending_data: Arc::clone(&self.pending_data),
1000        }
1001    }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::*;
1007
1008    #[test]
1009    fn test_endpoint_stats_default() {
1010        let stats = EndpointStats::default();
1011        assert_eq!(stats.active_connections, 0);
1012        assert_eq!(stats.successful_connections, 0);
1013        assert_eq!(stats.nat_traversal_attempts, 0);
1014    }
1015
1016    #[test]
1017    fn test_connection_metrics_default() {
1018        let metrics = ConnectionMetrics::default();
1019        assert_eq!(metrics.bytes_sent, 0);
1020        assert_eq!(metrics.bytes_received, 0);
1021        assert!(metrics.rtt.is_none());
1022        assert_eq!(metrics.packet_loss, 0.0);
1023    }
1024
1025    #[test]
1026    fn test_peer_connection_debug() {
1027        let conn = PeerConnection {
1028            peer_id: PeerId([0u8; 32]),
1029            remote_addr: "127.0.0.1:8080".parse().expect("valid addr"),
1030            authenticated: false,
1031            connected_at: Instant::now(),
1032            last_activity: Instant::now(),
1033        };
1034        let debug_str = format!("{:?}", conn);
1035        assert!(debug_str.contains("PeerConnection"));
1036    }
1037
1038    #[test]
1039    fn test_disconnect_reason_debug() {
1040        let reason = DisconnectReason::Normal;
1041        assert!(format!("{:?}", reason).contains("Normal"));
1042
1043        let reason = DisconnectReason::ProtocolError("test".to_string());
1044        assert!(format!("{:?}", reason).contains("test"));
1045    }
1046
1047    #[test]
1048    fn test_traversal_phase_debug() {
1049        let phase = TraversalPhase::Discovery;
1050        assert!(format!("{:?}", phase).contains("Discovery"));
1051    }
1052
1053    #[test]
1054    fn test_endpoint_error_display() {
1055        let err = EndpointError::Timeout;
1056        assert!(err.to_string().contains("timed out"));
1057
1058        let err = EndpointError::PeerNotFound(PeerId([0u8; 32]));
1059        assert!(err.to_string().contains("not found"));
1060    }
1061
1062    #[cfg(feature = "runtime-tokio")]
1063    #[tokio::test]
1064    async fn test_endpoint_creation() {
1065        // v0.13.0+: No role - all nodes are symmetric P2P nodes
1066        let config = P2pConfig::builder().build().expect("valid config");
1067
1068        let result = P2pEndpoint::new(config).await;
1069        // May fail in test environment without network, but shouldn't panic
1070        if let Ok(endpoint) = result {
1071            assert!(endpoint.is_running());
1072            assert!(endpoint.local_addr().is_some() || endpoint.local_addr().is_none());
1073        }
1074    }
1075}