ant_quic/
quic_node.rs

1//! QUIC-based P2P node with NAT traversal
2//!
3//! This module provides a QUIC-based implementation of the P2P node
4//! that integrates with the NAT traversal protocol.
5
6use std::{
7    collections::HashMap,
8    net::SocketAddr,
9    sync::Arc,
10    time::{Duration, Instant},
11};
12
13use tracing::{debug, error, info};
14
15use crate::{
16    auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
17    crypto::raw_public_keys::key_utils::{
18        derive_peer_id_from_public_key, generate_ed25519_keypair,
19    },
20    nat_traversal_api::{
21        EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
22        NatTraversalEvent, NatTraversalStatistics, PeerId,
23    },
24};
25
26/// QUIC-based P2P node with NAT traversal
27#[derive(Clone)]
28pub struct QuicP2PNode {
29    /// NAT traversal endpoint
30    nat_endpoint: Arc<NatTraversalEndpoint>,
31    /// Active peer connections (maps peer ID to their socket address)
32    connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
33    /// Node statistics
34    stats: Arc<tokio::sync::Mutex<NodeStats>>,
35    /// Node configuration
36    config: QuicNodeConfig,
37    /// Authentication manager
38    auth_manager: Arc<AuthManager>,
39    /// Our peer ID
40    peer_id: PeerId,
41}
42
43/// Configuration for QUIC P2P node
44#[derive(Debug, Clone)]
45pub struct QuicNodeConfig {
46    /// Role of this node
47    pub role: EndpointRole,
48    /// Bootstrap nodes
49    pub bootstrap_nodes: Vec<SocketAddr>,
50    /// Enable coordinator services
51    pub enable_coordinator: bool,
52    /// Max concurrent connections
53    pub max_connections: usize,
54    /// Connection timeout
55    pub connection_timeout: Duration,
56    /// Statistics interval
57    pub stats_interval: Duration,
58    /// Authentication configuration
59    pub auth_config: AuthConfig,
60    /// Bind address for the node
61    pub bind_addr: Option<SocketAddr>,
62}
63
64impl Default for QuicNodeConfig {
65    fn default() -> Self {
66        Self {
67            role: EndpointRole::Client,
68            bootstrap_nodes: Vec::new(),
69            enable_coordinator: false,
70            max_connections: 100,
71            connection_timeout: Duration::from_secs(30),
72            stats_interval: Duration::from_secs(30),
73            auth_config: AuthConfig::default(),
74            bind_addr: None,
75        }
76    }
77}
78
79/// Connection metrics for a specific peer
80#[derive(Debug, Clone)]
81pub struct ConnectionMetrics {
82    /// Bytes sent to this peer
83    pub bytes_sent: u64,
84    /// Bytes received from this peer
85    pub bytes_received: u64,
86    /// Round-trip time
87    pub rtt: Option<Duration>,
88    /// Packet loss rate (0.0 to 1.0)
89    pub packet_loss: f64,
90}
91
92/// Node statistics
93#[derive(Debug, Clone)]
94pub struct NodeStats {
95    /// Number of active connections
96    pub active_connections: usize,
97    /// Total successful connections
98    pub successful_connections: u64,
99    /// Total failed connections
100    pub failed_connections: u64,
101    /// NAT traversal attempts
102    pub nat_traversal_attempts: u64,
103    /// Successful NAT traversals
104    pub nat_traversal_successes: u64,
105    /// Node start time
106    pub start_time: Instant,
107}
108
109impl Default for NodeStats {
110    fn default() -> Self {
111        Self {
112            active_connections: 0,
113            successful_connections: 0,
114            failed_connections: 0,
115            nat_traversal_attempts: 0,
116            nat_traversal_successes: 0,
117            start_time: Instant::now(),
118        }
119    }
120}
121
122impl QuicP2PNode {
123    /// Create a new QUIC P2P node
124    pub async fn new(
125        config: QuicNodeConfig,
126    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
127        // Generate Ed25519 keypair for authentication
128        let (secret_key, public_key) = generate_ed25519_keypair();
129        let peer_id = derive_peer_id_from_public_key(&public_key);
130
131        info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
132
133        // Create authentication manager
134        let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
135
136        // Create NAT traversal configuration
137        let nat_config = NatTraversalConfig {
138            role: config.role,
139            bootstrap_nodes: config.bootstrap_nodes.clone(),
140            max_candidates: 50,
141            coordination_timeout: Duration::from_secs(10),
142            enable_symmetric_nat: true,
143            // Bootstrap nodes should not enable relay fallback
144            enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
145            max_concurrent_attempts: 5,
146            bind_addr: config.bind_addr,
147            prefer_rfc_nat_traversal: true, // Default to RFC format for standards compliance
148            timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
149        };
150
151        // Create event callback for NAT traversal events
152        let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
153            start_time: Instant::now(),
154            ..Default::default()
155        }));
156        let stats_for_callback = Arc::clone(&stats_clone);
157
158        let event_callback = Box::new(move |event: NatTraversalEvent| {
159            let stats = stats_for_callback.clone();
160            tokio::spawn(async move {
161                let mut stats = stats.lock().await;
162                match event {
163                    NatTraversalEvent::CoordinationRequested { .. } => {
164                        stats.nat_traversal_attempts += 1;
165                    }
166                    NatTraversalEvent::ConnectionEstablished { .. } => {
167                        stats.nat_traversal_successes += 1;
168                    }
169                    _ => {}
170                }
171            });
172        });
173
174        // Create NAT traversal endpoint
175        let nat_endpoint =
176            Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
177
178        Ok(Self {
179            nat_endpoint,
180            connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
181            stats: stats_clone,
182            config,
183            auth_manager,
184            peer_id,
185        })
186    }
187
188    /// Get the node configuration
189    pub fn get_config(&self) -> &QuicNodeConfig {
190        &self.config
191    }
192
193    /// Connect directly to a bootstrap node
194    pub async fn connect_to_bootstrap(
195        &self,
196        bootstrap_addr: SocketAddr,
197    ) -> Result<PeerId, NatTraversalError> {
198        info!("Connecting to bootstrap node at {}", bootstrap_addr);
199
200        // Get the quinn endpoint from NAT traversal endpoint
201        let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
202            NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
203        })?;
204
205        // Connect using the QUIC endpoint directly
206        match endpoint.connect(bootstrap_addr, "bootstrap-node") {
207            Ok(connecting) => {
208                match connecting.await {
209                    Ok(_connection) => {
210                        // Extract peer ID from the connection
211                        // For now, we'll generate a temporary peer ID based on the address
212                        // In a real implementation, we'd exchange peer IDs during the handshake
213                        let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
214
215                        // Store the connection
216                        self.connected_peers
217                            .write()
218                            .await
219                            .insert(peer_id, bootstrap_addr);
220
221                        // Update stats
222                        {
223                            let mut stats = self.stats.lock().await;
224                            stats.active_connections += 1;
225                            stats.successful_connections += 1;
226                        }
227
228                        // Fire connection established event
229                        if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
230                            callback(NatTraversalEvent::ConnectionEstablished {
231                                peer_id,
232                                remote_address: bootstrap_addr,
233                            });
234                        }
235
236                        info!(
237                            "Successfully connected to bootstrap node {} with peer ID {:?}",
238                            bootstrap_addr, peer_id
239                        );
240                        Ok(peer_id)
241                    }
242                    Err(e) => {
243                        error!(
244                            "Failed to establish connection to bootstrap node {}: {}",
245                            bootstrap_addr, e
246                        );
247                        {
248                            let mut stats = self.stats.lock().await;
249                            stats.failed_connections += 1;
250                        }
251                        Err(NatTraversalError::NetworkError(format!(
252                            "Connection failed: {e}"
253                        )))
254                    }
255                }
256            }
257            Err(e) => {
258                error!(
259                    "Failed to initiate connection to bootstrap node {}: {}",
260                    bootstrap_addr, e
261                );
262                {
263                    let mut stats = self.stats.lock().await;
264                    stats.failed_connections += 1;
265                }
266                Err(NatTraversalError::NetworkError(format!(
267                    "Connect error: {e}"
268                )))
269            }
270        }
271    }
272
273    /// Derive a peer ID from a socket address (temporary solution)
274    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
275        use std::collections::hash_map::DefaultHasher;
276        use std::hash::{Hash, Hasher};
277
278        let mut hasher = DefaultHasher::new();
279        addr.hash(&mut hasher);
280        let hash = hasher.finish();
281
282        let mut peer_id_bytes = [0u8; 32];
283        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
284        let port_bytes = addr.port().to_le_bytes();
285        peer_id_bytes[8..10].copy_from_slice(&port_bytes);
286
287        PeerId(peer_id_bytes)
288    }
289
290    /// Connect to a peer using NAT traversal
291    pub async fn connect_to_peer(
292        &self,
293        peer_id: PeerId,
294        coordinator: SocketAddr,
295    ) -> Result<SocketAddr, NatTraversalError> {
296        info!(
297            "Initiating connection to peer {:?} via coordinator {}",
298            peer_id, coordinator
299        );
300
301        // Update stats
302        {
303            let mut stats = self.stats.lock().await;
304            stats.nat_traversal_attempts += 1;
305        }
306
307        // Initiate NAT traversal
308        self.nat_endpoint
309            .initiate_nat_traversal(peer_id, coordinator)?;
310
311        // Poll for completion (in production, this would be event-driven)
312        let start = Instant::now();
313        let timeout = self.config.connection_timeout;
314
315        while start.elapsed() < timeout {
316            let events = self.nat_endpoint.poll(Instant::now())?;
317
318            for event in events {
319                match event {
320                    NatTraversalEvent::ConnectionEstablished {
321                        peer_id: evt_peer,
322                        remote_address,
323                    } => {
324                        if evt_peer == peer_id {
325                            // Store peer connection
326                            {
327                                let mut peers = self.connected_peers.write().await;
328                                peers.insert(peer_id, remote_address);
329                            }
330
331                            // Update stats
332                            {
333                                let mut stats = self.stats.lock().await;
334                                stats.successful_connections += 1;
335                                stats.active_connections += 1;
336                                stats.nat_traversal_successes += 1;
337                            }
338
339                            info!(
340                                "Successfully connected to peer {:?} at {}",
341                                peer_id, remote_address
342                            );
343
344                            // Perform authentication if required
345                            if self.config.auth_config.require_authentication {
346                                match self.authenticate_as_initiator(&peer_id).await {
347                                    Ok(_) => {
348                                        info!("Authentication successful with peer {:?}", peer_id);
349                                    }
350                                    Err(e) => {
351                                        error!(
352                                            "Authentication failed with peer {:?}: {}",
353                                            peer_id, e
354                                        );
355                                        // Remove from connected peers
356                                        self.connected_peers.write().await.remove(&peer_id);
357                                        // Update stats
358                                        let mut stats = self.stats.lock().await;
359                                        stats.active_connections =
360                                            stats.active_connections.saturating_sub(1);
361                                        stats.failed_connections += 1;
362                                        return Err(NatTraversalError::ConfigError(format!(
363                                            "Authentication failed: {e}"
364                                        )));
365                                    }
366                                }
367                            }
368
369                            return Ok(remote_address);
370                        }
371                    }
372                    NatTraversalEvent::TraversalFailed {
373                        peer_id: evt_peer,
374                        error,
375                        fallback_available: _,
376                    } => {
377                        if evt_peer == peer_id {
378                            // Update stats
379                            {
380                                let mut stats = self.stats.lock().await;
381                                stats.failed_connections += 1;
382                            }
383
384                            error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
385                            return Err(error);
386                        }
387                    }
388                    _ => {
389                        debug!("Received event: {:?}", event);
390                    }
391                }
392            }
393
394            // Brief sleep to avoid busy waiting
395            tokio::time::sleep(Duration::from_millis(100)).await;
396        }
397
398        // Timeout
399        {
400            let mut stats = self.stats.lock().await;
401            stats.failed_connections += 1;
402        }
403
404        Err(NatTraversalError::Timeout)
405    }
406
407    /// Accept incoming connections
408    pub async fn accept(
409        &self,
410    ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
411        info!("Waiting for incoming connection...");
412
413        // Accept connection through the NAT traversal endpoint
414        match self.nat_endpoint.accept_connection().await {
415            Ok((peer_id, connection)) => {
416                let remote_addr = connection.remote_address();
417
418                // Store the connection
419                {
420                    let mut peers = self.connected_peers.write().await;
421                    peers.insert(peer_id, remote_addr);
422                }
423
424                // Update stats
425                {
426                    let mut stats = self.stats.lock().await;
427                    stats.successful_connections += 1;
428                    stats.active_connections += 1;
429                }
430
431                info!(
432                    "Accepted connection from peer {:?} at {}",
433                    peer_id, remote_addr
434                );
435
436                // Handle authentication if required
437                if self.config.auth_config.require_authentication {
438                    // Start a task to handle incoming authentication
439                    let self_clone = self.clone();
440                    let auth_peer_id = peer_id;
441                    tokio::spawn(async move {
442                        if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
443                            error!(
444                                "Failed to handle authentication for peer {:?}: {}",
445                                auth_peer_id, e
446                            );
447                            // Remove the peer if auth fails
448                            self_clone
449                                .connected_peers
450                                .write()
451                                .await
452                                .remove(&auth_peer_id);
453                            let mut stats = self_clone.stats.lock().await;
454                            stats.active_connections = stats.active_connections.saturating_sub(1);
455                        }
456                    });
457                }
458
459                Ok((remote_addr, peer_id))
460            }
461            Err(e) => {
462                // Update stats
463                {
464                    let mut stats = self.stats.lock().await;
465                    stats.failed_connections += 1;
466                }
467
468                error!("Failed to accept connection: {}", e);
469                Err(Box::new(e))
470            }
471        }
472    }
473
474    /// Send data to a peer
475    pub async fn send_to_peer(
476        &self,
477        peer_id: &PeerId,
478        data: &[u8],
479    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
480        let peers = self.connected_peers.read().await;
481
482        if let Some(remote_addr) = peers.get(peer_id) {
483            debug!(
484                "Sending {} bytes to peer {:?} at {}",
485                data.len(),
486                peer_id,
487                remote_addr
488            );
489
490            // Get the Quinn connection for this peer from the NAT traversal endpoint
491            match self.nat_endpoint.get_connection(peer_id) {
492                Ok(Some(connection)) => {
493                    // Open a unidirectional stream for data transmission
494                    let mut send_stream = connection
495                        .open_uni()
496                        .await
497                        .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
498
499                    // Send the data
500                    send_stream
501                        .write_all(data)
502                        .await
503                        .map_err(|e| format!("Failed to write data: {e}"))?;
504
505                    // Finish the stream
506                    send_stream
507                        .finish()
508                        .map_err(|e| format!("Failed to finish stream: {e}"))?;
509
510                    debug!(
511                        "Successfully sent {} bytes to peer {:?}",
512                        data.len(),
513                        peer_id
514                    );
515                    Ok(())
516                }
517                Ok(None) => {
518                    error!("No active connection found for peer {:?}", peer_id);
519                    Err("No active connection".into())
520                }
521                Err(e) => {
522                    error!("Failed to get connection for peer {:?}: {}", peer_id, e);
523                    Err(Box::new(e))
524                }
525            }
526        } else {
527            error!("Peer {:?} not connected", peer_id);
528            Err("Peer not connected".into())
529        }
530    }
531
532    /// Receive data from peers
533    pub async fn receive(
534        &self,
535    ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
536        debug!("Waiting to receive data from any connected peer...");
537
538        // Get all connected peers
539        let peers = {
540            let peers_guard = self.connected_peers.read().await;
541            peers_guard.clone()
542        };
543
544        if peers.is_empty() {
545            return Err("No connected peers".into());
546        }
547
548        // Try to receive data from any connected peer
549        // In a real implementation, this would use a more sophisticated approach
550        // like select! over multiple connection streams
551        for (peer_id, _remote_addr) in peers.iter() {
552            match self.nat_endpoint.get_connection(peer_id) {
553                Ok(Some(connection)) => {
554                    // Try to accept incoming unidirectional streams
555                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
556                        .await
557                    {
558                        Ok(Ok(mut recv_stream)) => {
559                            debug!(
560                                "Receiving data from unidirectional stream from peer {:?}",
561                                peer_id
562                            );
563
564                            // Read all data from the stream
565                            match recv_stream.read_to_end(1024 * 1024).await {
566                                // 1MB limit
567                                Ok(buffer) => {
568                                    if !buffer.is_empty() {
569                                        debug!(
570                                            "Received {} bytes from peer {:?}",
571                                            buffer.len(),
572                                            peer_id
573                                        );
574                                        return Ok((*peer_id, buffer));
575                                    }
576                                }
577                                Err(e) => {
578                                    debug!(
579                                        "Failed to read from stream for peer {:?}: {}",
580                                        peer_id, e
581                                    );
582                                }
583                            }
584                        }
585                        Ok(Err(e)) => {
586                            debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
587                        }
588                        Err(_) => {
589                            // Timeout - try bidirectional streams
590                        }
591                    }
592
593                    // Also try to accept bidirectional streams
594                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
595                        .await
596                    {
597                        Ok(Ok((_send_stream, mut recv_stream))) => {
598                            debug!(
599                                "Receiving data from bidirectional stream from peer {:?}",
600                                peer_id
601                            );
602
603                            // Read all data from the receive side
604                            match recv_stream.read_to_end(1024 * 1024).await {
605                                // 1MB limit
606                                Ok(buffer) => {
607                                    if !buffer.is_empty() {
608                                        debug!(
609                                            "Received {} bytes from peer {:?} via bidirectional stream",
610                                            buffer.len(),
611                                            peer_id
612                                        );
613                                        return Ok((*peer_id, buffer));
614                                    }
615                                }
616                                Err(e) => {
617                                    debug!(
618                                        "Failed to read from bidirectional stream for peer {:?}: {}",
619                                        peer_id, e
620                                    );
621                                }
622                            }
623                        }
624                        Ok(Err(e)) => {
625                            debug!(
626                                "Failed to accept bidirectional stream from peer {:?}: {}",
627                                peer_id, e
628                            );
629                        }
630                        Err(_) => {
631                            // Timeout - continue to next peer
632                        }
633                    }
634                }
635                Ok(None) => {
636                    debug!("No active connection for peer {:?}", peer_id);
637                }
638                Err(e) => {
639                    debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
640                }
641            }
642        }
643
644        // If we get here, no data was received from any peer
645        Err("No data available from any connected peer".into())
646    }
647
648    /// Get current statistics
649    pub async fn get_stats(&self) -> NodeStats {
650        self.stats.lock().await.clone()
651    }
652
653    /// Get access to the NAT traversal endpoint
654    pub fn get_nat_endpoint(
655        &self,
656    ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
657        Ok(&*self.nat_endpoint)
658    }
659
660    /// Start periodic statistics reporting
661    pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
662        let stats = Arc::clone(&self.stats);
663        let interval_duration = self.config.stats_interval;
664
665        tokio::spawn(async move {
666            let mut interval = tokio::time::interval(interval_duration);
667
668            loop {
669                interval.tick().await;
670
671                let stats_snapshot = stats.lock().await.clone();
672
673                info!(
674                    "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
675                    stats_snapshot.active_connections,
676                    stats_snapshot.successful_connections,
677                    stats_snapshot.nat_traversal_successes,
678                    stats_snapshot.nat_traversal_attempts
679                );
680            }
681        })
682    }
683
684    /// Get NAT traversal statistics
685    pub async fn get_nat_stats(
686        &self,
687    ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
688        self.nat_endpoint.get_nat_stats()
689    }
690
691    /// Inject an observed address (temporary workaround for OBSERVED_ADDRESS frame integration)
692    pub fn inject_observed_address(
693        &self,
694        observed_address: SocketAddr,
695        from_peer: PeerId,
696    ) -> Result<(), NatTraversalError> {
697        self.nat_endpoint
698            .inject_observed_address(observed_address, from_peer)
699    }
700
701    /// Get connection metrics for a specific peer
702    pub async fn get_connection_metrics(
703        &self,
704        peer_id: &PeerId,
705    ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
706        match self.nat_endpoint.get_connection(peer_id) {
707            Ok(Some(connection)) => {
708                // Get basic RTT from the connection
709                let rtt = connection.rtt();
710
711                // Get congestion window and other stats
712                let stats = connection.stats();
713
714                Ok(ConnectionMetrics {
715                    bytes_sent: stats.udp_tx.bytes,
716                    bytes_received: stats.udp_rx.bytes,
717                    rtt: Some(rtt),
718                    packet_loss: stats.path.lost_packets as f64
719                        / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
720                })
721            }
722            Ok(None) => Err("Connection not found".into()),
723            Err(e) => Err(format!("Failed to get connection: {e}").into()),
724        }
725    }
726
727    /// Get this node's peer ID
728    pub fn peer_id(&self) -> PeerId {
729        self.peer_id
730    }
731
732    /// Get this node's public key bytes
733    pub fn public_key_bytes(&self) -> [u8; 32] {
734        self.auth_manager.public_key_bytes()
735    }
736
737    /// Send an authentication message to a peer
738    async fn send_auth_message(
739        &self,
740        peer_id: &PeerId,
741        message: AuthMessage,
742    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
743        let data = AuthManager::serialize_message(&message)?;
744        self.send_to_peer(peer_id, &data).await
745    }
746
747    /// Perform authentication handshake as initiator
748    async fn authenticate_as_initiator(
749        &self,
750        peer_id: &PeerId,
751    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
752        info!("Starting authentication with peer {:?}", peer_id);
753
754        // Send authentication request
755        let auth_request = self.auth_manager.create_auth_request();
756        self.send_auth_message(peer_id, auth_request).await?;
757
758        // Wait for challenge
759        let timeout_duration = self.config.auth_config.auth_timeout;
760        let start = Instant::now();
761
762        while start.elapsed() < timeout_duration {
763            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
764                Ok(Ok((recv_peer_id, data))) => {
765                    if recv_peer_id == *peer_id {
766                        match AuthManager::deserialize_message(&data) {
767                            Ok(AuthMessage::Challenge { nonce, .. }) => {
768                                // Create and send challenge response
769                                let response =
770                                    self.auth_manager.create_challenge_response(nonce)?;
771                                self.send_auth_message(peer_id, response).await?;
772                            }
773                            Ok(AuthMessage::AuthSuccess { .. }) => {
774                                info!("Authentication successful with peer {:?}", peer_id);
775                                return Ok(());
776                            }
777                            Ok(AuthMessage::AuthFailure { reason }) => {
778                                return Err(format!("Authentication failed: {reason}").into());
779                            }
780                            _ => continue,
781                        }
782                    }
783                }
784                _ => continue,
785            }
786        }
787
788        Err("Authentication timeout".into())
789    }
790
791    /// Handle incoming authentication messages
792    async fn handle_auth_message(
793        &self,
794        peer_id: PeerId,
795        message: AuthMessage,
796    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
797        let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
798
799        match auth_protocol.handle_message(peer_id, message).await {
800            Ok(Some(response)) => {
801                self.send_auth_message(&peer_id, response).await?;
802            }
803            Ok(None) => {
804                // No response needed
805            }
806            Err(e) => {
807                error!("Authentication error: {}", e);
808                let failure = AuthMessage::AuthFailure {
809                    reason: e.to_string(),
810                };
811                self.send_auth_message(&peer_id, failure).await?;
812                return Err(Box::new(e));
813            }
814        }
815
816        Ok(())
817    }
818
819    /// Check if a peer is authenticated
820    pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
821        self.auth_manager.is_authenticated(peer_id).await
822    }
823
824    /// Get list of authenticated peers
825    pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
826        self.auth_manager.list_authenticated_peers().await
827    }
828
829    /// Handle incoming authentication from a peer
830    async fn handle_incoming_auth(
831        &self,
832        peer_id: PeerId,
833    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
834        info!("Handling incoming authentication from peer {:?}", peer_id);
835
836        let timeout_duration = self.config.auth_config.auth_timeout;
837        let start = Instant::now();
838
839        while start.elapsed() < timeout_duration {
840            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
841                Ok(Ok((recv_peer_id, data))) => {
842                    if recv_peer_id == peer_id {
843                        match AuthManager::deserialize_message(&data) {
844                            Ok(auth_msg) => {
845                                self.handle_auth_message(peer_id, auth_msg).await?;
846
847                                // Check if authentication is complete
848                                if self.auth_manager.is_authenticated(&peer_id).await {
849                                    info!("Peer {:?} successfully authenticated", peer_id);
850                                    return Ok(());
851                                }
852                            }
853                            Err(_) => {
854                                // Not an auth message, ignore
855                                continue;
856                            }
857                        }
858                    }
859                }
860                _ => continue,
861            }
862        }
863
864        Err("Authentication timeout waiting for peer".into())
865    }
866}