ant_quic/
quic_node.rs

1//! QUIC-based P2P node with NAT traversal
2//!
3//! This module provides a QUIC-based implementation of the P2P node
4//! that integrates with the NAT traversal protocol.
5
6use std::{
7    collections::HashMap,
8    net::SocketAddr,
9    sync::Arc,
10    time::{Duration, Instant},
11};
12
13use tracing::{debug, info, error};
14
15use crate::{
16    nat_traversal_api::{
17        NatTraversalEndpoint, NatTraversalConfig, NatTraversalEvent,
18        EndpointRole, PeerId, NatTraversalError, NatTraversalStatistics,
19    },
20    auth::{AuthManager, AuthConfig, AuthMessage, AuthProtocol},
21    crypto::raw_public_keys::key_utils::{
22        generate_ed25519_keypair, derive_peer_id_from_public_key,
23    },
24};
25
26/// QUIC-based P2P node with NAT traversal
27#[derive(Clone)]
28pub struct QuicP2PNode {
29    /// NAT traversal endpoint
30    nat_endpoint: Arc<NatTraversalEndpoint>,
31    /// Active peer connections (maps peer ID to their socket address)
32    connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
33    /// Node statistics
34    stats: Arc<tokio::sync::Mutex<NodeStats>>,
35    /// Node configuration
36    config: QuicNodeConfig,
37    /// Authentication manager
38    auth_manager: Arc<AuthManager>,
39    /// Our peer ID
40    peer_id: PeerId,
41}
42
43/// Configuration for QUIC P2P node
44#[derive(Debug, Clone)]
45pub struct QuicNodeConfig {
46    /// Role of this node
47    pub role: EndpointRole,
48    /// Bootstrap nodes
49    pub bootstrap_nodes: Vec<SocketAddr>,
50    /// Enable coordinator services
51    pub enable_coordinator: bool,
52    /// Max concurrent connections
53    pub max_connections: usize,
54    /// Connection timeout
55    pub connection_timeout: Duration,
56    /// Statistics interval
57    pub stats_interval: Duration,
58    /// Authentication configuration
59    pub auth_config: AuthConfig,
60    /// Bind address for the node
61    pub bind_addr: Option<SocketAddr>,
62}
63
64impl Default for QuicNodeConfig {
65    fn default() -> Self {
66        Self {
67            role: EndpointRole::Client,
68            bootstrap_nodes: Vec::new(),
69            enable_coordinator: false,
70            max_connections: 100,
71            connection_timeout: Duration::from_secs(30),
72            stats_interval: Duration::from_secs(30),
73            auth_config: AuthConfig::default(),
74            bind_addr: None,
75        }
76    }
77}
78
79/// Connection metrics for a specific peer
80#[derive(Debug, Clone)]
81pub struct ConnectionMetrics {
82    /// Bytes sent to this peer
83    pub bytes_sent: u64,
84    /// Bytes received from this peer
85    pub bytes_received: u64,
86    /// Round-trip time
87    pub rtt: Option<Duration>,
88    /// Packet loss rate (0.0 to 1.0)
89    pub packet_loss: f64,
90}
91
92/// Node statistics
93#[derive(Debug, Clone)]
94pub struct NodeStats {
95    /// Number of active connections
96    pub active_connections: usize,
97    /// Total successful connections
98    pub successful_connections: u64,
99    /// Total failed connections
100    pub failed_connections: u64,
101    /// NAT traversal attempts
102    pub nat_traversal_attempts: u64,
103    /// Successful NAT traversals
104    pub nat_traversal_successes: u64,
105    /// Node start time
106    pub start_time: Instant,
107}
108
109impl Default for NodeStats {
110    fn default() -> Self {
111        Self {
112            active_connections: 0,
113            successful_connections: 0,
114            failed_connections: 0,
115            nat_traversal_attempts: 0,
116            nat_traversal_successes: 0,
117            start_time: Instant::now(),
118        }
119    }
120}
121
122impl QuicP2PNode {
123    /// Create a new QUIC P2P node
124    pub async fn new(config: QuicNodeConfig) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
125        // Generate Ed25519 keypair for authentication
126        let (secret_key, public_key) = generate_ed25519_keypair();
127        let peer_id = derive_peer_id_from_public_key(&public_key);
128        
129        info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
130        
131        // Create authentication manager
132        let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
133        
134        // Create NAT traversal configuration
135        let nat_config = NatTraversalConfig {
136            role: config.role,
137            bootstrap_nodes: config.bootstrap_nodes.clone(),
138            max_candidates: 50,
139            coordination_timeout: Duration::from_secs(10),
140            enable_symmetric_nat: true,
141            // Bootstrap nodes should not enable relay fallback
142            enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
143            max_concurrent_attempts: 5,
144            bind_addr: config.bind_addr,
145        };
146
147        // Create event callback for NAT traversal events
148        let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
149            start_time: Instant::now(),
150            ..Default::default()
151        }));
152        let stats_for_callback = Arc::clone(&stats_clone);
153        
154        let event_callback = Box::new(move |event: NatTraversalEvent| {
155            let stats = stats_for_callback.clone();
156            tokio::spawn(async move {
157                let mut stats = stats.lock().await;
158                match event {
159                    NatTraversalEvent::CoordinationRequested { .. } => {
160                        stats.nat_traversal_attempts += 1;
161                    }
162                    NatTraversalEvent::ConnectionEstablished { .. } => {
163                        stats.nat_traversal_successes += 1;
164                    }
165                    _ => {}
166                }
167            });
168        });
169
170        // Create NAT traversal endpoint
171        let nat_endpoint = Arc::new(
172            NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?
173        );
174
175        Ok(Self {
176            nat_endpoint,
177            connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
178            stats: stats_clone,
179            config,
180            auth_manager,
181            peer_id,
182        })
183    }
184
185    /// Get the node configuration
186    pub fn get_config(&self) -> &QuicNodeConfig {
187        &self.config
188    }
189
190    /// Connect directly to a bootstrap node
191    pub async fn connect_to_bootstrap(
192        &self,
193        bootstrap_addr: SocketAddr,
194    ) -> Result<PeerId, NatTraversalError> {
195        info!("Connecting to bootstrap node at {}", bootstrap_addr);
196        
197        // Get the quinn endpoint from NAT traversal endpoint
198        let endpoint = self.nat_endpoint.get_quinn_endpoint()
199            .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not available".to_string()))?;
200        
201        // Connect using the QUIC endpoint directly
202        match endpoint.connect(bootstrap_addr, "bootstrap-node") {
203            Ok(connecting) => {
204                match connecting.await {
205                    Ok(connection) => {
206                        // Extract peer ID from the connection
207                        // For now, we'll generate a temporary peer ID based on the address
208                        // In a real implementation, we'd exchange peer IDs during the handshake
209                        let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
210                        
211                        // Store the connection
212                        self.connected_peers.write().await.insert(peer_id, bootstrap_addr);
213                        
214                        // Update stats
215                        {
216                            let mut stats = self.stats.lock().await;
217                            stats.active_connections += 1;
218                            stats.successful_connections += 1;
219                        }
220                        
221                        // Fire connection established event
222                        if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
223                            callback(NatTraversalEvent::ConnectionEstablished {
224                                peer_id,
225                                remote_address: bootstrap_addr,
226                            });
227                        }
228                        
229                        info!("Successfully connected to bootstrap node {} with peer ID {:?}", bootstrap_addr, peer_id);
230                        Ok(peer_id)
231                    }
232                    Err(e) => {
233                        error!("Failed to establish connection to bootstrap node {}: {}", bootstrap_addr, e);
234                        {
235                            let mut stats = self.stats.lock().await;
236                            stats.failed_connections += 1;
237                        }
238                        Err(NatTraversalError::NetworkError(format!("Connection failed: {}", e)))
239                    }
240                }
241            }
242            Err(e) => {
243                error!("Failed to initiate connection to bootstrap node {}: {}", bootstrap_addr, e);
244                {
245                    let mut stats = self.stats.lock().await;
246                    stats.failed_connections += 1;
247                }
248                Err(NatTraversalError::NetworkError(format!("Connect error: {}", e)))
249            }
250        }
251    }
252    
253    /// Derive a peer ID from a socket address (temporary solution)
254    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
255        use std::hash::{Hash, Hasher};
256        use std::collections::hash_map::DefaultHasher;
257        
258        let mut hasher = DefaultHasher::new();
259        addr.hash(&mut hasher);
260        let hash = hasher.finish();
261        
262        let mut peer_id_bytes = [0u8; 32];
263        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
264        peer_id_bytes[8..16].copy_from_slice(&addr.port().to_le_bytes());
265        
266        PeerId(peer_id_bytes)
267    }
268
269    /// Connect to a peer using NAT traversal
270    pub async fn connect_to_peer(
271        &self,
272        peer_id: PeerId,
273        coordinator: SocketAddr,
274    ) -> Result<SocketAddr, NatTraversalError> {
275        info!("Initiating connection to peer {:?} via coordinator {}", peer_id, coordinator);
276        
277        // Update stats
278        {
279            let mut stats = self.stats.lock().await;
280            stats.nat_traversal_attempts += 1;
281        }
282
283        // Initiate NAT traversal
284        self.nat_endpoint.initiate_nat_traversal(peer_id, coordinator)?;
285
286        // Poll for completion (in production, this would be event-driven)
287        let start = Instant::now();
288        let timeout = self.config.connection_timeout;
289        
290        while start.elapsed() < timeout {
291            let events = self.nat_endpoint.poll(Instant::now())?;
292            
293            for event in events {
294                match event {
295                    NatTraversalEvent::ConnectionEstablished { peer_id: evt_peer, remote_address } => {
296                        if evt_peer == peer_id {
297                            // Store peer connection
298                            {
299                                let mut peers = self.connected_peers.write().await;
300                                peers.insert(peer_id, remote_address);
301                            }
302                            
303                            // Update stats
304                            {
305                                let mut stats = self.stats.lock().await;
306                                stats.successful_connections += 1;
307                                stats.active_connections += 1;
308                                stats.nat_traversal_successes += 1;
309                            }
310                            
311                            info!("Successfully connected to peer {:?} at {}", peer_id, remote_address);
312                            
313                            // Perform authentication if required
314                            if self.config.auth_config.require_authentication {
315                                match self.authenticate_as_initiator(&peer_id).await {
316                                    Ok(_) => {
317                                        info!("Authentication successful with peer {:?}", peer_id);
318                                    }
319                                    Err(e) => {
320                                        error!("Authentication failed with peer {:?}: {}", peer_id, e);
321                                        // Remove from connected peers
322                                        self.connected_peers.write().await.remove(&peer_id);
323                                        // Update stats
324                                        let mut stats = self.stats.lock().await;
325                                        stats.active_connections = stats.active_connections.saturating_sub(1);
326                                        stats.failed_connections += 1;
327                                        return Err(NatTraversalError::ConfigError(format!("Authentication failed: {}", e)));
328                                    }
329                                }
330                            }
331                            
332                            return Ok(remote_address);
333                        }
334                    }
335                    NatTraversalEvent::TraversalFailed { peer_id: evt_peer, error, fallback_available: _ } => {
336                        if evt_peer == peer_id {
337                            // Update stats
338                            {
339                                let mut stats = self.stats.lock().await;
340                                stats.failed_connections += 1;
341                            }
342                            
343                            error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
344                            return Err(error);
345                        }
346                    }
347                    _ => {
348                        debug!("Received event: {:?}", event);
349                    }
350                }
351            }
352            
353            // Brief sleep to avoid busy waiting
354            tokio::time::sleep(Duration::from_millis(100)).await;
355        }
356        
357        // Timeout
358        {
359            let mut stats = self.stats.lock().await;
360            stats.failed_connections += 1;
361        }
362        
363        Err(NatTraversalError::Timeout)
364    }
365
366    /// Accept incoming connections
367    pub async fn accept(&self) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
368        info!("Waiting for incoming connection...");
369        
370        // Accept connection through the NAT traversal endpoint
371        match self.nat_endpoint.accept_connection().await {
372            Ok((peer_id, connection)) => {
373                let remote_addr = connection.remote_address();
374                
375                // Store the connection
376                {
377                    let mut peers = self.connected_peers.write().await;
378                    peers.insert(peer_id, remote_addr);
379                }
380                
381                // Update stats
382                {
383                    let mut stats = self.stats.lock().await;
384                    stats.successful_connections += 1;
385                    stats.active_connections += 1;
386                }
387                
388                info!("Accepted connection from peer {:?} at {}", peer_id, remote_addr);
389                
390                // Handle authentication if required
391                if self.config.auth_config.require_authentication {
392                    // Start a task to handle incoming authentication
393                    let self_clone = self.clone();
394                    let auth_peer_id = peer_id;
395                    tokio::spawn(async move {
396                        if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
397                            error!("Failed to handle authentication for peer {:?}: {}", auth_peer_id, e);
398                            // Remove the peer if auth fails
399                            self_clone.connected_peers.write().await.remove(&auth_peer_id);
400                            let mut stats = self_clone.stats.lock().await;
401                            stats.active_connections = stats.active_connections.saturating_sub(1);
402                        }
403                    });
404                }
405                
406                Ok((remote_addr, peer_id))
407            }
408            Err(e) => {
409                // Update stats
410                {
411                    let mut stats = self.stats.lock().await;
412                    stats.failed_connections += 1;
413                }
414                
415                error!("Failed to accept connection: {}", e);
416                Err(Box::new(e))
417            }
418        }
419    }
420
421    /// Send data to a peer
422    pub async fn send_to_peer(
423        &self,
424        peer_id: &PeerId,
425        data: &[u8],
426    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
427        let peers = self.connected_peers.read().await;
428        
429        if let Some(remote_addr) = peers.get(peer_id) {
430            debug!("Sending {} bytes to peer {:?} at {}", data.len(), peer_id, remote_addr);
431            
432            // Get the Quinn connection for this peer from the NAT traversal endpoint
433            match self.nat_endpoint.get_connection(peer_id) {
434                Ok(Some(connection)) => {
435                    // Open a unidirectional stream for data transmission
436                    let mut send_stream = connection.open_uni().await
437                        .map_err(|e| format!("Failed to open unidirectional stream: {}", e))?;
438                    
439                    // Send the data
440                    send_stream.write_all(data).await
441                        .map_err(|e| format!("Failed to write data: {}", e))?;
442                    
443                    // Finish the stream
444                    send_stream.finish().map_err(|e| format!("Failed to finish stream: {}", e))?;
445                    
446                    debug!("Successfully sent {} bytes to peer {:?}", data.len(), peer_id);
447                    Ok(())
448                }
449                Ok(None) => {
450                    error!("No active connection found for peer {:?}", peer_id);
451                    Err("No active connection".into())
452                }
453                Err(e) => {
454                    error!("Failed to get connection for peer {:?}: {}", peer_id, e);
455                    Err(Box::new(e))
456                }
457            }
458        } else {
459            error!("Peer {:?} not connected", peer_id);
460            Err("Peer not connected".into())
461        }
462    }
463
464    /// Receive data from peers
465    pub async fn receive(&self) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
466        debug!("Waiting to receive data from any connected peer...");
467        
468        // Get all connected peers
469        let peers = {
470            let peers_guard = self.connected_peers.read().await;
471            peers_guard.clone()
472        };
473        
474        if peers.is_empty() {
475            return Err("No connected peers".into());
476        }
477        
478        // Try to receive data from any connected peer
479        // In a real implementation, this would use a more sophisticated approach
480        // like select! over multiple connection streams
481        for (peer_id, _remote_addr) in peers.iter() {
482            match self.nat_endpoint.get_connection(peer_id) {
483                Ok(Some(connection)) => {
484                    // Try to accept incoming unidirectional streams
485                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni()).await {
486                        Ok(Ok(mut recv_stream)) => {
487                            debug!("Receiving data from unidirectional stream from peer {:?}", peer_id);
488                            
489                            // Read all data from the stream
490                            match recv_stream.read_to_end(1024 * 1024).await { // 1MB limit
491                                Ok(buffer) => {
492                                    if !buffer.is_empty() {
493                                        debug!("Received {} bytes from peer {:?}", buffer.len(), peer_id);
494                                        return Ok((*peer_id, buffer));
495                                    }
496                                }
497                                Err(e) => {
498                                    debug!("Failed to read from stream for peer {:?}: {}", peer_id, e);
499                                }
500                            }
501                        }
502                        Ok(Err(e)) => {
503                            debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
504                        }
505                        Err(_) => {
506                            // Timeout - try bidirectional streams
507                        }
508                    }
509                    
510                    // Also try to accept bidirectional streams
511                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi()).await {
512                        Ok(Ok((_send_stream, mut recv_stream))) => {
513                            debug!("Receiving data from bidirectional stream from peer {:?}", peer_id);
514                            
515                            // Read all data from the receive side
516                            match recv_stream.read_to_end(1024 * 1024).await { // 1MB limit
517                                Ok(buffer) => {
518                                    if !buffer.is_empty() {
519                                        debug!("Received {} bytes from peer {:?} via bidirectional stream", buffer.len(), peer_id);
520                                        return Ok((*peer_id, buffer));
521                                    }
522                                }
523                                Err(e) => {
524                                    debug!("Failed to read from bidirectional stream for peer {:?}: {}", peer_id, e);
525                                }
526                            }
527                        }
528                        Ok(Err(e)) => {
529                            debug!("Failed to accept bidirectional stream from peer {:?}: {}", peer_id, e);
530                        }
531                        Err(_) => {
532                            // Timeout - continue to next peer
533                        }
534                    }
535                }
536                Ok(None) => {
537                    debug!("No active connection for peer {:?}", peer_id);
538                }
539                Err(e) => {
540                    debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
541                }
542            }
543        }
544        
545        // If we get here, no data was received from any peer
546        Err("No data available from any connected peer".into())
547    }
548
549    /// Get current statistics
550    pub async fn get_stats(&self) -> NodeStats {
551        self.stats.lock().await.clone()
552    }
553    
554    /// Get access to the NAT traversal endpoint
555    pub fn get_nat_endpoint(&self) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
556        Ok(&*self.nat_endpoint)
557    }
558
559    /// Start periodic statistics reporting
560    pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
561        let stats = Arc::clone(&self.stats);
562        let interval_duration = self.config.stats_interval;
563        
564        tokio::spawn(async move {
565            let mut interval = tokio::time::interval(interval_duration);
566            
567            loop {
568                interval.tick().await;
569                
570                let stats_snapshot = stats.lock().await.clone();
571                
572                info!(
573                    "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
574                    stats_snapshot.active_connections,
575                    stats_snapshot.successful_connections,
576                    stats_snapshot.nat_traversal_successes,
577                    stats_snapshot.nat_traversal_attempts
578                );
579            }
580        })
581    }
582
583
584    /// Get NAT traversal statistics
585    pub async fn get_nat_stats(&self) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
586        self.nat_endpoint.get_nat_stats()
587    }
588
589    /// Get connection metrics for a specific peer
590    pub async fn get_connection_metrics(&self, peer_id: &PeerId) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
591        match self.nat_endpoint.get_connection(peer_id) {
592            Ok(Some(connection)) => {
593                // Get basic RTT from the connection
594                let rtt = connection.rtt();
595                
596                // Get congestion window and other stats
597                let stats = connection.stats();
598                
599                Ok(ConnectionMetrics {
600                    bytes_sent: stats.udp_tx.bytes,
601                    bytes_received: stats.udp_rx.bytes,
602                    rtt: Some(rtt),
603                    packet_loss: stats.path.lost_packets as f64 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
604                })
605            }
606            Ok(None) => Err("Connection not found".into()),
607            Err(e) => Err(format!("Failed to get connection: {}", e).into()),
608        }
609    }
610    
611    /// Get this node's peer ID
612    pub fn peer_id(&self) -> PeerId {
613        self.peer_id
614    }
615    
616    /// Get this node's public key bytes
617    pub fn public_key_bytes(&self) -> [u8; 32] {
618        self.auth_manager.public_key_bytes()
619    }
620    
621    /// Send an authentication message to a peer
622    async fn send_auth_message(
623        &self,
624        peer_id: &PeerId,
625        message: AuthMessage,
626    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
627        let data = AuthManager::serialize_message(&message)?;
628        self.send_to_peer(peer_id, &data).await
629    }
630    
631    /// Perform authentication handshake as initiator
632    async fn authenticate_as_initiator(
633        &self,
634        peer_id: &PeerId,
635    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
636        info!("Starting authentication with peer {:?}", peer_id);
637        
638        // Send authentication request
639        let auth_request = self.auth_manager.create_auth_request();
640        self.send_auth_message(peer_id, auth_request).await?;
641        
642        // Wait for challenge
643        let timeout_duration = self.config.auth_config.auth_timeout;
644        let start = Instant::now();
645        
646        while start.elapsed() < timeout_duration {
647            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
648                Ok(Ok((recv_peer_id, data))) => {
649                    if recv_peer_id == *peer_id {
650                        match AuthManager::deserialize_message(&data) {
651                            Ok(AuthMessage::Challenge { nonce, .. }) => {
652                                // Create and send challenge response
653                                let response = self.auth_manager.create_challenge_response(nonce)?;
654                                self.send_auth_message(peer_id, response).await?;
655                            }
656                            Ok(AuthMessage::AuthSuccess { .. }) => {
657                                info!("Authentication successful with peer {:?}", peer_id);
658                                return Ok(());
659                            }
660                            Ok(AuthMessage::AuthFailure { reason }) => {
661                                return Err(format!("Authentication failed: {}", reason).into());
662                            }
663                            _ => continue,
664                        }
665                    }
666                }
667                _ => continue,
668            }
669        }
670        
671        Err("Authentication timeout".into())
672    }
673    
674    /// Handle incoming authentication messages
675    async fn handle_auth_message(
676        &self,
677        peer_id: PeerId,
678        message: AuthMessage,
679    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
680        let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
681        
682        match auth_protocol.handle_message(peer_id, message).await {
683            Ok(Some(response)) => {
684                self.send_auth_message(&peer_id, response).await?;
685            }
686            Ok(None) => {
687                // No response needed
688            }
689            Err(e) => {
690                error!("Authentication error: {}", e);
691                let failure = AuthMessage::AuthFailure {
692                    reason: e.to_string(),
693                };
694                self.send_auth_message(&peer_id, failure).await?;
695                return Err(Box::new(e));
696            }
697        }
698        
699        Ok(())
700    }
701    
702    /// Check if a peer is authenticated
703    pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
704        self.auth_manager.is_authenticated(peer_id).await
705    }
706    
707    /// Get list of authenticated peers
708    pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
709        self.auth_manager.list_authenticated_peers().await
710    }
711    
712    /// Handle incoming authentication from a peer
713    async fn handle_incoming_auth(
714        &self,
715        peer_id: PeerId,
716    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
717        info!("Handling incoming authentication from peer {:?}", peer_id);
718        
719        let timeout_duration = self.config.auth_config.auth_timeout;
720        let start = Instant::now();
721        
722        while start.elapsed() < timeout_duration {
723            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
724                Ok(Ok((recv_peer_id, data))) => {
725                    if recv_peer_id == peer_id {
726                        match AuthManager::deserialize_message(&data) {
727                            Ok(auth_msg) => {
728                                self.handle_auth_message(peer_id, auth_msg).await?;
729                                
730                                // Check if authentication is complete
731                                if self.auth_manager.is_authenticated(&peer_id).await {
732                                    info!("Peer {:?} successfully authenticated", peer_id);
733                                    return Ok(());
734                                }
735                            }
736                            Err(_) => {
737                                // Not an auth message, ignore
738                                continue;
739                            }
740                        }
741                    }
742                }
743                _ => continue,
744            }
745        }
746        
747        Err("Authentication timeout waiting for peer".into())
748    }
749}
750