ant_quic/
quic_node.rs

1//! QUIC-based P2P node with NAT traversal
2//!
3//! This module provides a QUIC-based implementation of the P2P node
4//! that integrates with the NAT traversal protocol.
5
6use std::{
7    collections::HashMap,
8    net::SocketAddr,
9    sync::Arc,
10    time::{Duration, Instant},
11};
12
13use tracing::{debug, error, info};
14
15use crate::{
16    auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
17    crypto::raw_public_keys::key_utils::{
18        derive_peer_id_from_public_key, generate_ed25519_keypair,
19    },
20    nat_traversal_api::{
21        EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
22        NatTraversalEvent, NatTraversalStatistics, PeerId,
23    },
24};
25
26/// QUIC-based P2P node with NAT traversal
27#[derive(Clone)]
28pub struct QuicP2PNode {
29    /// NAT traversal endpoint
30    nat_endpoint: Arc<NatTraversalEndpoint>,
31    /// Active peer connections (maps peer ID to their socket address)
32    connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
33    /// Node statistics
34    stats: Arc<tokio::sync::Mutex<NodeStats>>,
35    /// Node configuration
36    config: QuicNodeConfig,
37    /// Authentication manager
38    auth_manager: Arc<AuthManager>,
39    /// Our peer ID
40    peer_id: PeerId,
41}
42
43/// Configuration for QUIC P2P node
44#[derive(Debug, Clone)]
45pub struct QuicNodeConfig {
46    /// Role of this node
47    pub role: EndpointRole,
48    /// Bootstrap nodes
49    pub bootstrap_nodes: Vec<SocketAddr>,
50    /// Enable coordinator services
51    pub enable_coordinator: bool,
52    /// Max concurrent connections
53    pub max_connections: usize,
54    /// Connection timeout
55    pub connection_timeout: Duration,
56    /// Statistics interval
57    pub stats_interval: Duration,
58    /// Authentication configuration
59    pub auth_config: AuthConfig,
60    /// Bind address for the node
61    pub bind_addr: Option<SocketAddr>,
62}
63
64impl Default for QuicNodeConfig {
65    fn default() -> Self {
66        Self {
67            role: EndpointRole::Client,
68            bootstrap_nodes: Vec::new(),
69            enable_coordinator: false,
70            max_connections: 100,
71            connection_timeout: Duration::from_secs(30),
72            stats_interval: Duration::from_secs(30),
73            auth_config: AuthConfig::default(),
74            bind_addr: None,
75        }
76    }
77}
78
79/// Connection metrics for a specific peer
80#[derive(Debug, Clone)]
81pub struct ConnectionMetrics {
82    /// Bytes sent to this peer
83    pub bytes_sent: u64,
84    /// Bytes received from this peer
85    pub bytes_received: u64,
86    /// Round-trip time
87    pub rtt: Option<Duration>,
88    /// Packet loss rate (0.0 to 1.0)
89    pub packet_loss: f64,
90}
91
92/// Node statistics
93#[derive(Debug, Clone)]
94pub struct NodeStats {
95    /// Number of active connections
96    pub active_connections: usize,
97    /// Total successful connections
98    pub successful_connections: u64,
99    /// Total failed connections
100    pub failed_connections: u64,
101    /// NAT traversal attempts
102    pub nat_traversal_attempts: u64,
103    /// Successful NAT traversals
104    pub nat_traversal_successes: u64,
105    /// Node start time
106    pub start_time: Instant,
107}
108
109impl Default for NodeStats {
110    fn default() -> Self {
111        Self {
112            active_connections: 0,
113            successful_connections: 0,
114            failed_connections: 0,
115            nat_traversal_attempts: 0,
116            nat_traversal_successes: 0,
117            start_time: Instant::now(),
118        }
119    }
120}
121
122impl QuicP2PNode {
123    /// Create a new QUIC P2P node
124    pub async fn new(
125        config: QuicNodeConfig,
126    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
127        // Generate Ed25519 keypair for authentication
128        let (secret_key, public_key) = generate_ed25519_keypair();
129        let peer_id = derive_peer_id_from_public_key(&public_key);
130
131        info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
132
133        // Create authentication manager
134        let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
135
136        // Create NAT traversal configuration
137        let nat_config = NatTraversalConfig {
138            role: config.role,
139            bootstrap_nodes: config.bootstrap_nodes.clone(),
140            max_candidates: 50,
141            coordination_timeout: Duration::from_secs(10),
142            enable_symmetric_nat: true,
143            // Bootstrap nodes should not enable relay fallback
144            enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
145            max_concurrent_attempts: 5,
146            bind_addr: config.bind_addr,
147        };
148
149        // Create event callback for NAT traversal events
150        let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
151            start_time: Instant::now(),
152            ..Default::default()
153        }));
154        let stats_for_callback = Arc::clone(&stats_clone);
155
156        let event_callback = Box::new(move |event: NatTraversalEvent| {
157            let stats = stats_for_callback.clone();
158            tokio::spawn(async move {
159                let mut stats = stats.lock().await;
160                match event {
161                    NatTraversalEvent::CoordinationRequested { .. } => {
162                        stats.nat_traversal_attempts += 1;
163                    }
164                    NatTraversalEvent::ConnectionEstablished { .. } => {
165                        stats.nat_traversal_successes += 1;
166                    }
167                    _ => {}
168                }
169            });
170        });
171
172        // Create NAT traversal endpoint
173        let nat_endpoint =
174            Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
175
176        Ok(Self {
177            nat_endpoint,
178            connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
179            stats: stats_clone,
180            config,
181            auth_manager,
182            peer_id,
183        })
184    }
185
186    /// Get the node configuration
187    pub fn get_config(&self) -> &QuicNodeConfig {
188        &self.config
189    }
190
191    /// Connect directly to a bootstrap node
192    pub async fn connect_to_bootstrap(
193        &self,
194        bootstrap_addr: SocketAddr,
195    ) -> Result<PeerId, NatTraversalError> {
196        info!("Connecting to bootstrap node at {}", bootstrap_addr);
197
198        // Get the quinn endpoint from NAT traversal endpoint
199        let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
200            NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
201        })?;
202
203        // Connect using the QUIC endpoint directly
204        match endpoint.connect(bootstrap_addr, "bootstrap-node") {
205            Ok(connecting) => {
206                match connecting.await {
207                    Ok(_connection) => {
208                        // Extract peer ID from the connection
209                        // For now, we'll generate a temporary peer ID based on the address
210                        // In a real implementation, we'd exchange peer IDs during the handshake
211                        let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
212
213                        // Store the connection
214                        self.connected_peers
215                            .write()
216                            .await
217                            .insert(peer_id, bootstrap_addr);
218
219                        // Update stats
220                        {
221                            let mut stats = self.stats.lock().await;
222                            stats.active_connections += 1;
223                            stats.successful_connections += 1;
224                        }
225
226                        // Fire connection established event
227                        if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
228                            callback(NatTraversalEvent::ConnectionEstablished {
229                                peer_id,
230                                remote_address: bootstrap_addr,
231                            });
232                        }
233
234                        info!(
235                            "Successfully connected to bootstrap node {} with peer ID {:?}",
236                            bootstrap_addr, peer_id
237                        );
238                        Ok(peer_id)
239                    }
240                    Err(e) => {
241                        error!(
242                            "Failed to establish connection to bootstrap node {}: {}",
243                            bootstrap_addr, e
244                        );
245                        {
246                            let mut stats = self.stats.lock().await;
247                            stats.failed_connections += 1;
248                        }
249                        Err(NatTraversalError::NetworkError(format!(
250                            "Connection failed: {}",
251                            e
252                        )))
253                    }
254                }
255            }
256            Err(e) => {
257                error!(
258                    "Failed to initiate connection to bootstrap node {}: {}",
259                    bootstrap_addr, e
260                );
261                {
262                    let mut stats = self.stats.lock().await;
263                    stats.failed_connections += 1;
264                }
265                Err(NatTraversalError::NetworkError(format!(
266                    "Connect error: {}",
267                    e
268                )))
269            }
270        }
271    }
272
273    /// Derive a peer ID from a socket address (temporary solution)
274    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
275        use std::collections::hash_map::DefaultHasher;
276        use std::hash::{Hash, Hasher};
277
278        let mut hasher = DefaultHasher::new();
279        addr.hash(&mut hasher);
280        let hash = hasher.finish();
281
282        let mut peer_id_bytes = [0u8; 32];
283        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
284        let port_bytes = addr.port().to_le_bytes();
285        peer_id_bytes[8..10].copy_from_slice(&port_bytes);
286
287        PeerId(peer_id_bytes)
288    }
289
290    /// Connect to a peer using NAT traversal
291    pub async fn connect_to_peer(
292        &self,
293        peer_id: PeerId,
294        coordinator: SocketAddr,
295    ) -> Result<SocketAddr, NatTraversalError> {
296        info!(
297            "Initiating connection to peer {:?} via coordinator {}",
298            peer_id, coordinator
299        );
300
301        // Update stats
302        {
303            let mut stats = self.stats.lock().await;
304            stats.nat_traversal_attempts += 1;
305        }
306
307        // Initiate NAT traversal
308        self.nat_endpoint
309            .initiate_nat_traversal(peer_id, coordinator)?;
310
311        // Poll for completion (in production, this would be event-driven)
312        let start = Instant::now();
313        let timeout = self.config.connection_timeout;
314
315        while start.elapsed() < timeout {
316            let events = self.nat_endpoint.poll(Instant::now())?;
317
318            for event in events {
319                match event {
320                    NatTraversalEvent::ConnectionEstablished {
321                        peer_id: evt_peer,
322                        remote_address,
323                    } => {
324                        if evt_peer == peer_id {
325                            // Store peer connection
326                            {
327                                let mut peers = self.connected_peers.write().await;
328                                peers.insert(peer_id, remote_address);
329                            }
330
331                            // Update stats
332                            {
333                                let mut stats = self.stats.lock().await;
334                                stats.successful_connections += 1;
335                                stats.active_connections += 1;
336                                stats.nat_traversal_successes += 1;
337                            }
338
339                            info!(
340                                "Successfully connected to peer {:?} at {}",
341                                peer_id, remote_address
342                            );
343
344                            // Perform authentication if required
345                            if self.config.auth_config.require_authentication {
346                                match self.authenticate_as_initiator(&peer_id).await {
347                                    Ok(_) => {
348                                        info!("Authentication successful with peer {:?}", peer_id);
349                                    }
350                                    Err(e) => {
351                                        error!(
352                                            "Authentication failed with peer {:?}: {}",
353                                            peer_id, e
354                                        );
355                                        // Remove from connected peers
356                                        self.connected_peers.write().await.remove(&peer_id);
357                                        // Update stats
358                                        let mut stats = self.stats.lock().await;
359                                        stats.active_connections =
360                                            stats.active_connections.saturating_sub(1);
361                                        stats.failed_connections += 1;
362                                        return Err(NatTraversalError::ConfigError(format!(
363                                            "Authentication failed: {}",
364                                            e
365                                        )));
366                                    }
367                                }
368                            }
369
370                            return Ok(remote_address);
371                        }
372                    }
373                    NatTraversalEvent::TraversalFailed {
374                        peer_id: evt_peer,
375                        error,
376                        fallback_available: _,
377                    } => {
378                        if evt_peer == peer_id {
379                            // Update stats
380                            {
381                                let mut stats = self.stats.lock().await;
382                                stats.failed_connections += 1;
383                            }
384
385                            error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
386                            return Err(error);
387                        }
388                    }
389                    _ => {
390                        debug!("Received event: {:?}", event);
391                    }
392                }
393            }
394
395            // Brief sleep to avoid busy waiting
396            tokio::time::sleep(Duration::from_millis(100)).await;
397        }
398
399        // Timeout
400        {
401            let mut stats = self.stats.lock().await;
402            stats.failed_connections += 1;
403        }
404
405        Err(NatTraversalError::Timeout)
406    }
407
408    /// Accept incoming connections
409    pub async fn accept(
410        &self,
411    ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
412        info!("Waiting for incoming connection...");
413
414        // Accept connection through the NAT traversal endpoint
415        match self.nat_endpoint.accept_connection().await {
416            Ok((peer_id, connection)) => {
417                let remote_addr = connection.remote_address();
418
419                // Store the connection
420                {
421                    let mut peers = self.connected_peers.write().await;
422                    peers.insert(peer_id, remote_addr);
423                }
424
425                // Update stats
426                {
427                    let mut stats = self.stats.lock().await;
428                    stats.successful_connections += 1;
429                    stats.active_connections += 1;
430                }
431
432                info!(
433                    "Accepted connection from peer {:?} at {}",
434                    peer_id, remote_addr
435                );
436
437                // Handle authentication if required
438                if self.config.auth_config.require_authentication {
439                    // Start a task to handle incoming authentication
440                    let self_clone = self.clone();
441                    let auth_peer_id = peer_id;
442                    tokio::spawn(async move {
443                        if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
444                            error!(
445                                "Failed to handle authentication for peer {:?}: {}",
446                                auth_peer_id, e
447                            );
448                            // Remove the peer if auth fails
449                            self_clone
450                                .connected_peers
451                                .write()
452                                .await
453                                .remove(&auth_peer_id);
454                            let mut stats = self_clone.stats.lock().await;
455                            stats.active_connections = stats.active_connections.saturating_sub(1);
456                        }
457                    });
458                }
459
460                Ok((remote_addr, peer_id))
461            }
462            Err(e) => {
463                // Update stats
464                {
465                    let mut stats = self.stats.lock().await;
466                    stats.failed_connections += 1;
467                }
468
469                error!("Failed to accept connection: {}", e);
470                Err(Box::new(e))
471            }
472        }
473    }
474
475    /// Send data to a peer
476    pub async fn send_to_peer(
477        &self,
478        peer_id: &PeerId,
479        data: &[u8],
480    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
481        let peers = self.connected_peers.read().await;
482
483        if let Some(remote_addr) = peers.get(peer_id) {
484            debug!(
485                "Sending {} bytes to peer {:?} at {}",
486                data.len(),
487                peer_id,
488                remote_addr
489            );
490
491            // Get the Quinn connection for this peer from the NAT traversal endpoint
492            match self.nat_endpoint.get_connection(peer_id) {
493                Ok(Some(connection)) => {
494                    // Open a unidirectional stream for data transmission
495                    let mut send_stream = connection
496                        .open_uni()
497                        .await
498                        .map_err(|e| format!("Failed to open unidirectional stream: {}", e))?;
499
500                    // Send the data
501                    send_stream
502                        .write_all(data)
503                        .await
504                        .map_err(|e| format!("Failed to write data: {}", e))?;
505
506                    // Finish the stream
507                    send_stream
508                        .finish()
509                        .map_err(|e| format!("Failed to finish stream: {}", e))?;
510
511                    debug!(
512                        "Successfully sent {} bytes to peer {:?}",
513                        data.len(),
514                        peer_id
515                    );
516                    Ok(())
517                }
518                Ok(None) => {
519                    error!("No active connection found for peer {:?}", peer_id);
520                    Err("No active connection".into())
521                }
522                Err(e) => {
523                    error!("Failed to get connection for peer {:?}: {}", peer_id, e);
524                    Err(Box::new(e))
525                }
526            }
527        } else {
528            error!("Peer {:?} not connected", peer_id);
529            Err("Peer not connected".into())
530        }
531    }
532
533    /// Receive data from peers
534    pub async fn receive(
535        &self,
536    ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
537        debug!("Waiting to receive data from any connected peer...");
538
539        // Get all connected peers
540        let peers = {
541            let peers_guard = self.connected_peers.read().await;
542            peers_guard.clone()
543        };
544
545        if peers.is_empty() {
546            return Err("No connected peers".into());
547        }
548
549        // Try to receive data from any connected peer
550        // In a real implementation, this would use a more sophisticated approach
551        // like select! over multiple connection streams
552        for (peer_id, _remote_addr) in peers.iter() {
553            match self.nat_endpoint.get_connection(peer_id) {
554                Ok(Some(connection)) => {
555                    // Try to accept incoming unidirectional streams
556                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
557                        .await
558                    {
559                        Ok(Ok(mut recv_stream)) => {
560                            debug!(
561                                "Receiving data from unidirectional stream from peer {:?}",
562                                peer_id
563                            );
564
565                            // Read all data from the stream
566                            match recv_stream.read_to_end(1024 * 1024).await {
567                                // 1MB limit
568                                Ok(buffer) => {
569                                    if !buffer.is_empty() {
570                                        debug!(
571                                            "Received {} bytes from peer {:?}",
572                                            buffer.len(),
573                                            peer_id
574                                        );
575                                        return Ok((*peer_id, buffer));
576                                    }
577                                }
578                                Err(e) => {
579                                    debug!(
580                                        "Failed to read from stream for peer {:?}: {}",
581                                        peer_id, e
582                                    );
583                                }
584                            }
585                        }
586                        Ok(Err(e)) => {
587                            debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
588                        }
589                        Err(_) => {
590                            // Timeout - try bidirectional streams
591                        }
592                    }
593
594                    // Also try to accept bidirectional streams
595                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
596                        .await
597                    {
598                        Ok(Ok((_send_stream, mut recv_stream))) => {
599                            debug!(
600                                "Receiving data from bidirectional stream from peer {:?}",
601                                peer_id
602                            );
603
604                            // Read all data from the receive side
605                            match recv_stream.read_to_end(1024 * 1024).await {
606                                // 1MB limit
607                                Ok(buffer) => {
608                                    if !buffer.is_empty() {
609                                        debug!(
610                                            "Received {} bytes from peer {:?} via bidirectional stream",
611                                            buffer.len(),
612                                            peer_id
613                                        );
614                                        return Ok((*peer_id, buffer));
615                                    }
616                                }
617                                Err(e) => {
618                                    debug!(
619                                        "Failed to read from bidirectional stream for peer {:?}: {}",
620                                        peer_id, e
621                                    );
622                                }
623                            }
624                        }
625                        Ok(Err(e)) => {
626                            debug!(
627                                "Failed to accept bidirectional stream from peer {:?}: {}",
628                                peer_id, e
629                            );
630                        }
631                        Err(_) => {
632                            // Timeout - continue to next peer
633                        }
634                    }
635                }
636                Ok(None) => {
637                    debug!("No active connection for peer {:?}", peer_id);
638                }
639                Err(e) => {
640                    debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
641                }
642            }
643        }
644
645        // If we get here, no data was received from any peer
646        Err("No data available from any connected peer".into())
647    }
648
649    /// Get current statistics
650    pub async fn get_stats(&self) -> NodeStats {
651        self.stats.lock().await.clone()
652    }
653
654    /// Get access to the NAT traversal endpoint
655    pub fn get_nat_endpoint(
656        &self,
657    ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
658        Ok(&*self.nat_endpoint)
659    }
660
661    /// Start periodic statistics reporting
662    pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
663        let stats = Arc::clone(&self.stats);
664        let interval_duration = self.config.stats_interval;
665
666        tokio::spawn(async move {
667            let mut interval = tokio::time::interval(interval_duration);
668
669            loop {
670                interval.tick().await;
671
672                let stats_snapshot = stats.lock().await.clone();
673
674                info!(
675                    "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
676                    stats_snapshot.active_connections,
677                    stats_snapshot.successful_connections,
678                    stats_snapshot.nat_traversal_successes,
679                    stats_snapshot.nat_traversal_attempts
680                );
681            }
682        })
683    }
684
685    /// Get NAT traversal statistics
686    pub async fn get_nat_stats(
687        &self,
688    ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
689        self.nat_endpoint.get_nat_stats()
690    }
691
692    /// Get connection metrics for a specific peer
693    pub async fn get_connection_metrics(
694        &self,
695        peer_id: &PeerId,
696    ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
697        match self.nat_endpoint.get_connection(peer_id) {
698            Ok(Some(connection)) => {
699                // Get basic RTT from the connection
700                let rtt = connection.rtt();
701
702                // Get congestion window and other stats
703                let stats = connection.stats();
704
705                Ok(ConnectionMetrics {
706                    bytes_sent: stats.udp_tx.bytes,
707                    bytes_received: stats.udp_rx.bytes,
708                    rtt: Some(rtt),
709                    packet_loss: stats.path.lost_packets as f64
710                        / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
711                })
712            }
713            Ok(None) => Err("Connection not found".into()),
714            Err(e) => Err(format!("Failed to get connection: {}", e).into()),
715        }
716    }
717
718    /// Get this node's peer ID
719    pub fn peer_id(&self) -> PeerId {
720        self.peer_id
721    }
722
723    /// Get this node's public key bytes
724    pub fn public_key_bytes(&self) -> [u8; 32] {
725        self.auth_manager.public_key_bytes()
726    }
727
728    /// Send an authentication message to a peer
729    async fn send_auth_message(
730        &self,
731        peer_id: &PeerId,
732        message: AuthMessage,
733    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
734        let data = AuthManager::serialize_message(&message)?;
735        self.send_to_peer(peer_id, &data).await
736    }
737
738    /// Perform authentication handshake as initiator
739    async fn authenticate_as_initiator(
740        &self,
741        peer_id: &PeerId,
742    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
743        info!("Starting authentication with peer {:?}", peer_id);
744
745        // Send authentication request
746        let auth_request = self.auth_manager.create_auth_request();
747        self.send_auth_message(peer_id, auth_request).await?;
748
749        // Wait for challenge
750        let timeout_duration = self.config.auth_config.auth_timeout;
751        let start = Instant::now();
752
753        while start.elapsed() < timeout_duration {
754            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
755                Ok(Ok((recv_peer_id, data))) => {
756                    if recv_peer_id == *peer_id {
757                        match AuthManager::deserialize_message(&data) {
758                            Ok(AuthMessage::Challenge { nonce, .. }) => {
759                                // Create and send challenge response
760                                let response =
761                                    self.auth_manager.create_challenge_response(nonce)?;
762                                self.send_auth_message(peer_id, response).await?;
763                            }
764                            Ok(AuthMessage::AuthSuccess { .. }) => {
765                                info!("Authentication successful with peer {:?}", peer_id);
766                                return Ok(());
767                            }
768                            Ok(AuthMessage::AuthFailure { reason }) => {
769                                return Err(format!("Authentication failed: {}", reason).into());
770                            }
771                            _ => continue,
772                        }
773                    }
774                }
775                _ => continue,
776            }
777        }
778
779        Err("Authentication timeout".into())
780    }
781
782    /// Handle incoming authentication messages
783    async fn handle_auth_message(
784        &self,
785        peer_id: PeerId,
786        message: AuthMessage,
787    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
788        let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
789
790        match auth_protocol.handle_message(peer_id, message).await {
791            Ok(Some(response)) => {
792                self.send_auth_message(&peer_id, response).await?;
793            }
794            Ok(None) => {
795                // No response needed
796            }
797            Err(e) => {
798                error!("Authentication error: {}", e);
799                let failure = AuthMessage::AuthFailure {
800                    reason: e.to_string(),
801                };
802                self.send_auth_message(&peer_id, failure).await?;
803                return Err(Box::new(e));
804            }
805        }
806
807        Ok(())
808    }
809
810    /// Check if a peer is authenticated
811    pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
812        self.auth_manager.is_authenticated(peer_id).await
813    }
814
815    /// Get list of authenticated peers
816    pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
817        self.auth_manager.list_authenticated_peers().await
818    }
819
820    /// Handle incoming authentication from a peer
821    async fn handle_incoming_auth(
822        &self,
823        peer_id: PeerId,
824    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
825        info!("Handling incoming authentication from peer {:?}", peer_id);
826
827        let timeout_duration = self.config.auth_config.auth_timeout;
828        let start = Instant::now();
829
830        while start.elapsed() < timeout_duration {
831            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
832                Ok(Ok((recv_peer_id, data))) => {
833                    if recv_peer_id == peer_id {
834                        match AuthManager::deserialize_message(&data) {
835                            Ok(auth_msg) => {
836                                self.handle_auth_message(peer_id, auth_msg).await?;
837
838                                // Check if authentication is complete
839                                if self.auth_manager.is_authenticated(&peer_id).await {
840                                    info!("Peer {:?} successfully authenticated", peer_id);
841                                    return Ok(());
842                                }
843                            }
844                            Err(_) => {
845                                // Not an auth message, ignore
846                                continue;
847                            }
848                        }
849                    }
850                }
851                _ => continue,
852            }
853        }
854
855        Err("Authentication timeout waiting for peer".into())
856    }
857}