ant_quic/
quic_node.rs

1//! QUIC-based P2P node with NAT traversal
2//!
3//! This module provides a QUIC-based implementation of the P2P node
4//! that integrates with the NAT traversal protocol.
5
6use std::{
7    collections::HashMap,
8    net::SocketAddr,
9    sync::Arc,
10    time::{Duration, Instant},
11};
12
13use tracing::{debug, error, info};
14
15use crate::{
16    auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
17    crypto::raw_public_keys::key_utils::{
18        derive_peer_id_from_public_key, generate_ed25519_keypair,
19    },
20    nat_traversal_api::{
21        EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
22        NatTraversalEvent, NatTraversalStatistics, PeerId,
23    },
24};
25
26/// QUIC-based P2P node with NAT traversal
27#[derive(Clone, Debug)]
28pub struct QuicP2PNode {
29    /// NAT traversal endpoint
30    nat_endpoint: Arc<NatTraversalEndpoint>,
31    /// Active peer connections (maps peer ID to their socket address)
32    connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
33    /// Node statistics
34    stats: Arc<tokio::sync::Mutex<NodeStats>>,
35    /// Node configuration
36    config: QuicNodeConfig,
37    /// Authentication manager
38    auth_manager: Arc<AuthManager>,
39    /// Our peer ID
40    peer_id: PeerId,
41}
42
43/// Configuration for QUIC P2P node
44#[derive(Debug, Clone)]
45pub struct QuicNodeConfig {
46    /// Role of this node
47    pub role: EndpointRole,
48    /// Bootstrap nodes
49    pub bootstrap_nodes: Vec<SocketAddr>,
50    /// Enable coordinator services
51    pub enable_coordinator: bool,
52    /// Max concurrent connections
53    pub max_connections: usize,
54    /// Connection timeout
55    pub connection_timeout: Duration,
56    /// Statistics interval
57    pub stats_interval: Duration,
58    /// Authentication configuration
59    pub auth_config: AuthConfig,
60    /// Bind address for the node
61    pub bind_addr: Option<SocketAddr>,
62}
63
64impl Default for QuicNodeConfig {
65    fn default() -> Self {
66        Self {
67            role: EndpointRole::Client,
68            bootstrap_nodes: Vec::new(),
69            enable_coordinator: false,
70            max_connections: 100,
71            connection_timeout: Duration::from_secs(30),
72            stats_interval: Duration::from_secs(30),
73            auth_config: AuthConfig::default(),
74            bind_addr: None,
75        }
76    }
77}
78
79#[derive(Debug, Clone)]
80pub struct ConnectionMetrics {
81    /// Bytes sent to this peer
82    pub bytes_sent: u64,
83    /// Bytes received from this peer
84    pub bytes_received: u64,
85    /// Round-trip time
86    pub rtt: Option<Duration>,
87    /// Packet loss rate (0.0 to 1.0)
88    pub packet_loss: f64,
89}
90
91#[derive(Debug, Clone)]
92pub struct NodeStats {
93    /// Number of active connections
94    pub active_connections: usize,
95    /// Total successful connections
96    pub successful_connections: u64,
97    /// Total failed connections
98    pub failed_connections: u64,
99    /// NAT traversal attempts
100    pub nat_traversal_attempts: u64,
101    /// Successful NAT traversals
102    pub nat_traversal_successes: u64,
103    /// Node start time
104    pub start_time: Instant,
105}
106
107impl Default for NodeStats {
108    fn default() -> Self {
109        Self {
110            active_connections: 0,
111            successful_connections: 0,
112            failed_connections: 0,
113            nat_traversal_attempts: 0,
114            nat_traversal_successes: 0,
115            start_time: Instant::now(),
116        }
117    }
118}
119
120impl QuicP2PNode {
121    /// Create a new QUIC P2P node
122    pub async fn new(
123        config: QuicNodeConfig,
124    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
125        // Generate Ed25519 keypair for authentication
126        let (secret_key, public_key) = generate_ed25519_keypair();
127        let peer_id = derive_peer_id_from_public_key(&public_key);
128
129        info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
130
131        // Create authentication manager
132        let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
133
134        // Create NAT traversal configuration
135        let nat_config = NatTraversalConfig {
136            role: config.role,
137            bootstrap_nodes: config.bootstrap_nodes.clone(),
138            max_candidates: 50,
139            coordination_timeout: Duration::from_secs(10),
140            enable_symmetric_nat: true,
141            // Bootstrap nodes should not enable relay fallback
142            enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
143            max_concurrent_attempts: 5,
144            bind_addr: config.bind_addr,
145            prefer_rfc_nat_traversal: true, // Default to RFC format for standards compliance
146            timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
147        };
148
149        // Create event callback for NAT traversal events
150        let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
151            start_time: Instant::now(),
152            ..Default::default()
153        }));
154        let stats_for_callback = Arc::clone(&stats_clone);
155
156        let event_callback = Box::new(move |event: NatTraversalEvent| {
157            let stats = stats_for_callback.clone();
158            tokio::spawn(async move {
159                let mut stats = stats.lock().await;
160                match event {
161                    NatTraversalEvent::CoordinationRequested { .. } => {
162                        stats.nat_traversal_attempts += 1;
163                    }
164                    NatTraversalEvent::ConnectionEstablished { .. } => {
165                        stats.nat_traversal_successes += 1;
166                    }
167                    _ => {}
168                }
169            });
170        });
171
172        // Create NAT traversal endpoint
173        let nat_endpoint =
174            Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
175
176        Ok(Self {
177            nat_endpoint,
178            connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
179            stats: stats_clone,
180            config,
181            auth_manager,
182            peer_id,
183        })
184    }
185
186    /// Get the node configuration
187    pub fn get_config(&self) -> &QuicNodeConfig {
188        &self.config
189    }
190
191    /// Connect directly to a bootstrap node
192    pub async fn connect_to_bootstrap(
193        &self,
194        bootstrap_addr: SocketAddr,
195    ) -> Result<PeerId, NatTraversalError> {
196        info!("Connecting to bootstrap node at {}", bootstrap_addr);
197
198        // Get the quinn endpoint from NAT traversal endpoint
199        let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
200            NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
201        })?;
202
203        // Connect using the QUIC endpoint directly
204        match endpoint.connect(bootstrap_addr, "bootstrap-node") {
205            Ok(connecting) => {
206                match connecting.await {
207                    Ok(_connection) => {
208                        // Extract peer ID from the connection
209                        // For now, we'll generate a temporary peer ID based on the address
210                        // In a real implementation, we'd exchange peer IDs during the handshake
211                        let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
212
213                        // Store the connection
214                        self.connected_peers
215                            .write()
216                            .await
217                            .insert(peer_id, bootstrap_addr);
218
219                        // Update stats
220                        {
221                            let mut stats = self.stats.lock().await;
222                            stats.active_connections += 1;
223                            stats.successful_connections += 1;
224                        }
225
226                        // Fire connection established event
227                        if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
228                            callback(NatTraversalEvent::ConnectionEstablished {
229                                peer_id,
230                                remote_address: bootstrap_addr,
231                            });
232                        }
233
234                        info!(
235                            "Successfully connected to bootstrap node {} with peer ID {:?}",
236                            bootstrap_addr, peer_id
237                        );
238                        Ok(peer_id)
239                    }
240                    Err(e) => {
241                        error!(
242                            "Failed to establish connection to bootstrap node {}: {}",
243                            bootstrap_addr, e
244                        );
245                        {
246                            let mut stats = self.stats.lock().await;
247                            stats.failed_connections += 1;
248                        }
249                        Err(NatTraversalError::NetworkError(format!(
250                            "Connection failed: {e}"
251                        )))
252                    }
253                }
254            }
255            Err(e) => {
256                error!(
257                    "Failed to initiate connection to bootstrap node {}: {}",
258                    bootstrap_addr, e
259                );
260                {
261                    let mut stats = self.stats.lock().await;
262                    stats.failed_connections += 1;
263                }
264                Err(NatTraversalError::NetworkError(format!(
265                    "Connect error: {e}"
266                )))
267            }
268        }
269    }
270
271    /// Derive a peer ID from a socket address (temporary solution)
272    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
273        use std::collections::hash_map::DefaultHasher;
274        use std::hash::{Hash, Hasher};
275
276        let mut hasher = DefaultHasher::new();
277        addr.hash(&mut hasher);
278        let hash = hasher.finish();
279
280        let mut peer_id_bytes = [0u8; 32];
281        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
282        let port_bytes = addr.port().to_le_bytes();
283        peer_id_bytes[8..10].copy_from_slice(&port_bytes);
284
285        PeerId(peer_id_bytes)
286    }
287
288    /// Connect to a peer using NAT traversal
289    pub async fn connect_to_peer(
290        &self,
291        peer_id: PeerId,
292        coordinator: SocketAddr,
293    ) -> Result<SocketAddr, NatTraversalError> {
294        info!(
295            "Initiating connection to peer {:?} via coordinator {}",
296            peer_id, coordinator
297        );
298
299        // Update stats
300        {
301            let mut stats = self.stats.lock().await;
302            stats.nat_traversal_attempts += 1;
303        }
304
305        // Initiate NAT traversal
306        self.nat_endpoint
307            .initiate_nat_traversal(peer_id, coordinator)?;
308
309        // Poll for completion (in production, this would be event-driven)
310        let start = Instant::now();
311        let timeout = self.config.connection_timeout;
312
313        while start.elapsed() < timeout {
314            let events = self.nat_endpoint.poll(Instant::now())?;
315
316            for event in events {
317                match event {
318                    NatTraversalEvent::ConnectionEstablished {
319                        peer_id: evt_peer,
320                        remote_address,
321                    } => {
322                        if evt_peer == peer_id {
323                            // Store peer connection
324                            {
325                                let mut peers = self.connected_peers.write().await;
326                                peers.insert(peer_id, remote_address);
327                            }
328
329                            // Update stats
330                            {
331                                let mut stats = self.stats.lock().await;
332                                stats.successful_connections += 1;
333                                stats.active_connections += 1;
334                                stats.nat_traversal_successes += 1;
335                            }
336
337                            info!(
338                                "Successfully connected to peer {:?} at {}",
339                                peer_id, remote_address
340                            );
341
342                            // Perform authentication if required
343                            if self.config.auth_config.require_authentication {
344                                match self.authenticate_as_initiator(&peer_id).await {
345                                    Ok(_) => {
346                                        info!("Authentication successful with peer {:?}", peer_id);
347                                    }
348                                    Err(e) => {
349                                        error!(
350                                            "Authentication failed with peer {:?}: {}",
351                                            peer_id, e
352                                        );
353                                        // Remove from connected peers
354                                        self.connected_peers.write().await.remove(&peer_id);
355                                        // Update stats
356                                        let mut stats = self.stats.lock().await;
357                                        stats.active_connections =
358                                            stats.active_connections.saturating_sub(1);
359                                        stats.failed_connections += 1;
360                                        return Err(NatTraversalError::ConfigError(format!(
361                                            "Authentication failed: {e}"
362                                        )));
363                                    }
364                                }
365                            }
366
367                            return Ok(remote_address);
368                        }
369                    }
370                    NatTraversalEvent::TraversalFailed {
371                        peer_id: evt_peer,
372                        error,
373                        fallback_available: _,
374                    } => {
375                        if evt_peer == peer_id {
376                            // Update stats
377                            {
378                                let mut stats = self.stats.lock().await;
379                                stats.failed_connections += 1;
380                            }
381
382                            error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
383                            return Err(error);
384                        }
385                    }
386                    _ => {
387                        debug!("Received event: {:?}", event);
388                    }
389                }
390            }
391
392            // Brief sleep to avoid busy waiting
393            tokio::time::sleep(Duration::from_millis(100)).await;
394        }
395
396        // Timeout
397        {
398            let mut stats = self.stats.lock().await;
399            stats.failed_connections += 1;
400        }
401
402        Err(NatTraversalError::Timeout)
403    }
404
405    /// Accept incoming connections
406    pub async fn accept(
407        &self,
408    ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
409        info!("Waiting for incoming connection...");
410
411        // Accept connection through the NAT traversal endpoint
412        match self.nat_endpoint.accept_connection().await {
413            Ok((peer_id, connection)) => {
414                let remote_addr = connection.remote_address();
415
416                // Store the connection
417                {
418                    let mut peers = self.connected_peers.write().await;
419                    peers.insert(peer_id, remote_addr);
420                }
421
422                // Update stats
423                {
424                    let mut stats = self.stats.lock().await;
425                    stats.successful_connections += 1;
426                    stats.active_connections += 1;
427                }
428
429                info!(
430                    "Accepted connection from peer {:?} at {}",
431                    peer_id, remote_addr
432                );
433
434                // Handle authentication if required
435                if self.config.auth_config.require_authentication {
436                    // Start a task to handle incoming authentication
437                    let self_clone = self.clone();
438                    let auth_peer_id = peer_id;
439                    tokio::spawn(async move {
440                        if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
441                            error!(
442                                "Failed to handle authentication for peer {:?}: {}",
443                                auth_peer_id, e
444                            );
445                            // Remove the peer if auth fails
446                            self_clone
447                                .connected_peers
448                                .write()
449                                .await
450                                .remove(&auth_peer_id);
451                            let mut stats = self_clone.stats.lock().await;
452                            stats.active_connections = stats.active_connections.saturating_sub(1);
453                        }
454                    });
455                }
456
457                Ok((remote_addr, peer_id))
458            }
459            Err(e) => {
460                // Update stats
461                {
462                    let mut stats = self.stats.lock().await;
463                    stats.failed_connections += 1;
464                }
465
466                error!("Failed to accept connection: {}", e);
467                Err(Box::new(e))
468            }
469        }
470    }
471
472    /// Send data to a peer
473    pub async fn send_to_peer(
474        &self,
475        peer_id: &PeerId,
476        data: &[u8],
477    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
478        let peers = self.connected_peers.read().await;
479
480        if let Some(remote_addr) = peers.get(peer_id) {
481            debug!(
482                "Sending {} bytes to peer {:?} at {}",
483                data.len(),
484                peer_id,
485                remote_addr
486            );
487
488            // Get the Quinn connection for this peer from the NAT traversal endpoint
489            match self.nat_endpoint.get_connection(peer_id) {
490                Ok(Some(connection)) => {
491                    // Open a unidirectional stream for data transmission
492                    let mut send_stream = connection
493                        .open_uni()
494                        .await
495                        .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
496
497                    // Send the data
498                    send_stream
499                        .write_all(data)
500                        .await
501                        .map_err(|e| format!("Failed to write data: {e}"))?;
502
503                    // Finish the stream
504                    send_stream
505                        .finish()
506                        .map_err(|e| format!("Failed to finish stream: {e}"))?;
507
508                    debug!(
509                        "Successfully sent {} bytes to peer {:?}",
510                        data.len(),
511                        peer_id
512                    );
513                    Ok(())
514                }
515                Ok(None) => {
516                    error!("No active connection found for peer {:?}", peer_id);
517                    Err("No active connection".into())
518                }
519                Err(e) => {
520                    error!("Failed to get connection for peer {:?}: {}", peer_id, e);
521                    Err(Box::new(e))
522                }
523            }
524        } else {
525            error!("Peer {:?} not connected", peer_id);
526            Err("Peer not connected".into())
527        }
528    }
529
530    /// Receive data from peers
531    pub async fn receive(
532        &self,
533    ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
534        debug!("Waiting to receive data from any connected peer...");
535
536        // Get all connected peers
537        let peers = {
538            let peers_guard = self.connected_peers.read().await;
539            peers_guard.clone()
540        };
541
542        if peers.is_empty() {
543            return Err("No connected peers".into());
544        }
545
546        // Try to receive data from any connected peer
547        // In a real implementation, this would use a more sophisticated approach
548        // like select! over multiple connection streams
549        for (peer_id, _remote_addr) in peers.iter() {
550            match self.nat_endpoint.get_connection(peer_id) {
551                Ok(Some(connection)) => {
552                    // Try to accept incoming unidirectional streams
553                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
554                        .await
555                    {
556                        Ok(Ok(mut recv_stream)) => {
557                            debug!(
558                                "Receiving data from unidirectional stream from peer {:?}",
559                                peer_id
560                            );
561
562                            // Read all data from the stream
563                            match recv_stream.read_to_end(1024 * 1024).await {
564                                // 1MB limit
565                                Ok(buffer) => {
566                                    if !buffer.is_empty() {
567                                        debug!(
568                                            "Received {} bytes from peer {:?}",
569                                            buffer.len(),
570                                            peer_id
571                                        );
572                                        return Ok((*peer_id, buffer));
573                                    }
574                                }
575                                Err(e) => {
576                                    debug!(
577                                        "Failed to read from stream for peer {:?}: {}",
578                                        peer_id, e
579                                    );
580                                }
581                            }
582                        }
583                        Ok(Err(e)) => {
584                            debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
585                        }
586                        Err(_) => {
587                            // Timeout - try bidirectional streams
588                        }
589                    }
590
591                    // Also try to accept bidirectional streams
592                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
593                        .await
594                    {
595                        Ok(Ok((_send_stream, mut recv_stream))) => {
596                            debug!(
597                                "Receiving data from bidirectional stream from peer {:?}",
598                                peer_id
599                            );
600
601                            // Read all data from the receive side
602                            match recv_stream.read_to_end(1024 * 1024).await {
603                                // 1MB limit
604                                Ok(buffer) => {
605                                    if !buffer.is_empty() {
606                                        debug!(
607                                            "Received {} bytes from peer {:?} via bidirectional stream",
608                                            buffer.len(),
609                                            peer_id
610                                        );
611                                        return Ok((*peer_id, buffer));
612                                    }
613                                }
614                                Err(e) => {
615                                    debug!(
616                                        "Failed to read from bidirectional stream for peer {:?}: {}",
617                                        peer_id, e
618                                    );
619                                }
620                            }
621                        }
622                        Ok(Err(e)) => {
623                            debug!(
624                                "Failed to accept bidirectional stream from peer {:?}: {}",
625                                peer_id, e
626                            );
627                        }
628                        Err(_) => {
629                            // Timeout - continue to next peer
630                        }
631                    }
632                }
633                Ok(None) => {
634                    debug!("No active connection for peer {:?}", peer_id);
635                }
636                Err(e) => {
637                    debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
638                }
639            }
640        }
641
642        // If we get here, no data was received from any peer
643        Err("No data available from any connected peer".into())
644    }
645
646    /// Get current statistics
647    pub async fn get_stats(&self) -> NodeStats {
648        self.stats.lock().await.clone()
649    }
650
651    /// Get access to the NAT traversal endpoint
652    pub fn get_nat_endpoint(
653        &self,
654    ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
655        Ok(&*self.nat_endpoint)
656    }
657
658    /// Start periodic statistics reporting
659    pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
660        let stats = Arc::clone(&self.stats);
661        let interval_duration = self.config.stats_interval;
662
663        tokio::spawn(async move {
664            let mut interval = tokio::time::interval(interval_duration);
665
666            loop {
667                interval.tick().await;
668
669                let stats_snapshot = stats.lock().await.clone();
670
671                info!(
672                    "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
673                    stats_snapshot.active_connections,
674                    stats_snapshot.successful_connections,
675                    stats_snapshot.nat_traversal_successes,
676                    stats_snapshot.nat_traversal_attempts
677                );
678            }
679        })
680    }
681
682    /// Get NAT traversal statistics
683    pub async fn get_nat_stats(
684        &self,
685    ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
686        self.nat_endpoint.get_nat_stats()
687    }
688
689    /// Inject an observed address (temporary workaround for OBSERVED_ADDRESS frame integration)
690    pub fn inject_observed_address(
691        &self,
692        observed_address: SocketAddr,
693        from_peer: PeerId,
694    ) -> Result<(), NatTraversalError> {
695        self.nat_endpoint
696            .inject_observed_address(observed_address, from_peer)
697    }
698
699    /// Get connection metrics for a specific peer
700    pub async fn get_connection_metrics(
701        &self,
702        peer_id: &PeerId,
703    ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
704        match self.nat_endpoint.get_connection(peer_id) {
705            Ok(Some(connection)) => {
706                // Get basic RTT from the connection
707                let rtt = connection.rtt();
708
709                // Get congestion window and other stats
710                let stats = connection.stats();
711
712                Ok(ConnectionMetrics {
713                    bytes_sent: stats.udp_tx.bytes,
714                    bytes_received: stats.udp_rx.bytes,
715                    rtt: Some(rtt),
716                    packet_loss: stats.path.lost_packets as f64
717                        / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
718                })
719            }
720            Ok(None) => Err("Connection not found".into()),
721            Err(e) => Err(format!("Failed to get connection: {e}").into()),
722        }
723    }
724
725    /// Get this node's peer ID
726    pub fn peer_id(&self) -> PeerId {
727        self.peer_id
728    }
729
730    /// Get this node's public key bytes
731    pub fn public_key_bytes(&self) -> [u8; 32] {
732        self.auth_manager.public_key_bytes()
733    }
734
735    /// Send an authentication message to a peer
736    async fn send_auth_message(
737        &self,
738        peer_id: &PeerId,
739        message: AuthMessage,
740    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
741        let data = AuthManager::serialize_message(&message)?;
742        self.send_to_peer(peer_id, &data).await
743    }
744
745    /// Perform authentication handshake as initiator
746    async fn authenticate_as_initiator(
747        &self,
748        peer_id: &PeerId,
749    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
750        info!("Starting authentication with peer {:?}", peer_id);
751
752        // Send authentication request
753        let auth_request = self.auth_manager.create_auth_request();
754        self.send_auth_message(peer_id, auth_request).await?;
755
756        // Wait for challenge
757        let timeout_duration = self.config.auth_config.auth_timeout;
758        let start = Instant::now();
759
760        while start.elapsed() < timeout_duration {
761            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
762                Ok(Ok((recv_peer_id, data))) => {
763                    if recv_peer_id == *peer_id {
764                        match AuthManager::deserialize_message(&data) {
765                            Ok(AuthMessage::Challenge { nonce, .. }) => {
766                                // Create and send challenge response
767                                let response =
768                                    self.auth_manager.create_challenge_response(nonce)?;
769                                self.send_auth_message(peer_id, response).await?;
770                            }
771                            Ok(AuthMessage::AuthSuccess { .. }) => {
772                                info!("Authentication successful with peer {:?}", peer_id);
773                                return Ok(());
774                            }
775                            Ok(AuthMessage::AuthFailure { reason }) => {
776                                return Err(format!("Authentication failed: {reason}").into());
777                            }
778                            _ => continue,
779                        }
780                    }
781                }
782                _ => continue,
783            }
784        }
785
786        Err("Authentication timeout".into())
787    }
788
789    /// Handle incoming authentication messages
790    async fn handle_auth_message(
791        &self,
792        peer_id: PeerId,
793        message: AuthMessage,
794    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
795        let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
796
797        match auth_protocol.handle_message(peer_id, message).await {
798            Ok(Some(response)) => {
799                self.send_auth_message(&peer_id, response).await?;
800            }
801            Ok(None) => {
802                // No response needed
803            }
804            Err(e) => {
805                error!("Authentication error: {}", e);
806                let failure = AuthMessage::AuthFailure {
807                    reason: e.to_string(),
808                };
809                self.send_auth_message(&peer_id, failure).await?;
810                return Err(Box::new(e));
811            }
812        }
813
814        Ok(())
815    }
816
817    /// Check if a peer is authenticated
818    pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
819        self.auth_manager.is_authenticated(peer_id).await
820    }
821
822    /// Get list of authenticated peers
823    pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
824        self.auth_manager.list_authenticated_peers().await
825    }
826
827    /// Handle incoming authentication from a peer
828    async fn handle_incoming_auth(
829        &self,
830        peer_id: PeerId,
831    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
832        info!("Handling incoming authentication from peer {:?}", peer_id);
833
834        let timeout_duration = self.config.auth_config.auth_timeout;
835        let start = Instant::now();
836
837        while start.elapsed() < timeout_duration {
838            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
839                Ok(Ok((recv_peer_id, data))) => {
840                    if recv_peer_id == peer_id {
841                        match AuthManager::deserialize_message(&data) {
842                            Ok(auth_msg) => {
843                                self.handle_auth_message(peer_id, auth_msg).await?;
844
845                                // Check if authentication is complete
846                                if self.auth_manager.is_authenticated(&peer_id).await {
847                                    info!("Peer {:?} successfully authenticated", peer_id);
848                                    return Ok(());
849                                }
850                            }
851                            Err(_) => {
852                                // Not an auth message, ignore
853                                continue;
854                            }
855                        }
856                    }
857                }
858                _ => continue,
859            }
860        }
861
862        Err("Authentication timeout waiting for peer".into())
863    }
864}