ant_quic/
quic_node.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! QUIC-based P2P node with NAT traversal
9//!
10//! This module provides a QUIC-based implementation of the P2P node
11//! that integrates with the NAT traversal protocol.
12
13use std::{
14    collections::HashMap,
15    net::SocketAddr,
16    sync::Arc,
17    time::{Duration, Instant},
18};
19
20use tracing::{debug, error, info};
21
22use crate::{
23    auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
24    crypto::raw_public_keys::key_utils::{
25        derive_peer_id_from_public_key, generate_ed25519_keypair,
26    },
27    nat_traversal_api::{
28        EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
29        NatTraversalEvent, NatTraversalStatistics, PeerId,
30    },
31};
32
33/// QUIC-based P2P node with NAT traversal
34#[derive(Clone, Debug)]
35pub struct QuicP2PNode {
36    /// NAT traversal endpoint
37    nat_endpoint: Arc<NatTraversalEndpoint>,
38    /// Active peer connections (maps peer ID to their socket address)
39    connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
40    /// Node statistics
41    stats: Arc<tokio::sync::Mutex<NodeStats>>,
42    /// Node configuration
43    config: QuicNodeConfig,
44    /// Authentication manager
45    auth_manager: Arc<AuthManager>,
46    /// Our peer ID
47    peer_id: PeerId,
48}
49
50/// Configuration for QUIC P2P node
51#[derive(Debug, Clone)]
52pub struct QuicNodeConfig {
53    /// Role of this node
54    pub role: EndpointRole,
55    /// Bootstrap nodes
56    pub bootstrap_nodes: Vec<SocketAddr>,
57    /// Enable coordinator services
58    pub enable_coordinator: bool,
59    /// Max concurrent connections
60    pub max_connections: usize,
61    /// Connection timeout
62    pub connection_timeout: Duration,
63    /// Statistics interval
64    pub stats_interval: Duration,
65    /// Authentication configuration
66    pub auth_config: AuthConfig,
67    /// Bind address for the node
68    pub bind_addr: Option<SocketAddr>,
69}
70
71impl Default for QuicNodeConfig {
72    fn default() -> Self {
73        Self {
74            role: EndpointRole::Client,
75            bootstrap_nodes: Vec::new(),
76            enable_coordinator: false,
77            max_connections: 100,
78            connection_timeout: Duration::from_secs(30),
79            stats_interval: Duration::from_secs(30),
80            auth_config: AuthConfig::default(),
81            bind_addr: None,
82        }
83    }
84}
85
86/// Basic per-connection performance metrics
87#[derive(Debug, Clone)]
88pub struct ConnectionMetrics {
89    /// Bytes sent to this peer
90    pub bytes_sent: u64,
91    /// Bytes received from this peer
92    pub bytes_received: u64,
93    /// Round-trip time
94    pub rtt: Option<Duration>,
95    /// Packet loss rate (0.0 to 1.0)
96    pub packet_loss: f64,
97}
98
99/// Aggregate node statistics for monitoring and telemetry
100#[derive(Debug, Clone)]
101pub struct NodeStats {
102    /// Number of active connections
103    pub active_connections: usize,
104    /// Total successful connections
105    pub successful_connections: u64,
106    /// Total failed connections
107    pub failed_connections: u64,
108    /// NAT traversal attempts
109    pub nat_traversal_attempts: u64,
110    /// Successful NAT traversals
111    pub nat_traversal_successes: u64,
112    /// Node start time
113    pub start_time: Instant,
114}
115
116impl Default for NodeStats {
117    fn default() -> Self {
118        Self {
119            active_connections: 0,
120            successful_connections: 0,
121            failed_connections: 0,
122            nat_traversal_attempts: 0,
123            nat_traversal_successes: 0,
124            start_time: Instant::now(),
125        }
126    }
127}
128
129impl QuicP2PNode {
130    /// Create a new QUIC P2P node
131    pub async fn new(
132        config: QuicNodeConfig,
133    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
134        // Generate Ed25519 keypair for authentication
135        let (secret_key, public_key) = generate_ed25519_keypair();
136        let peer_id = derive_peer_id_from_public_key(&public_key);
137
138        info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
139
140        // Create authentication manager
141        let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
142
143        // Create NAT traversal configuration
144        let nat_config = NatTraversalConfig {
145            role: config.role,
146            bootstrap_nodes: config.bootstrap_nodes.clone(),
147            max_candidates: 50,
148            coordination_timeout: Duration::from_secs(10),
149            enable_symmetric_nat: true,
150            // Bootstrap nodes should not enable relay fallback
151            enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
152            max_concurrent_attempts: 5,
153            bind_addr: config.bind_addr,
154            prefer_rfc_nat_traversal: true, // Default to RFC format for standards compliance
155            timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
156        };
157
158        // Create event callback for NAT traversal events
159        let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
160            start_time: Instant::now(),
161            ..Default::default()
162        }));
163        let stats_for_callback = Arc::clone(&stats_clone);
164
165        let event_callback = Box::new(move |event: NatTraversalEvent| {
166            let stats = stats_for_callback.clone();
167            tokio::spawn(async move {
168                let mut stats = stats.lock().await;
169                match event {
170                    NatTraversalEvent::CoordinationRequested { .. } => {
171                        stats.nat_traversal_attempts += 1;
172                    }
173                    NatTraversalEvent::ConnectionEstablished { .. } => {
174                        stats.nat_traversal_successes += 1;
175                    }
176                    _ => {}
177                }
178            });
179        });
180
181        // Create NAT traversal endpoint
182        let nat_endpoint =
183            Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
184
185        Ok(Self {
186            nat_endpoint,
187            connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
188            stats: stats_clone,
189            config,
190            auth_manager,
191            peer_id,
192        })
193    }
194
195    /// Get the node configuration
196    pub fn get_config(&self) -> &QuicNodeConfig {
197        &self.config
198    }
199
200    /// Connect directly to a bootstrap node
201    pub async fn connect_to_bootstrap(
202        &self,
203        bootstrap_addr: SocketAddr,
204    ) -> Result<PeerId, NatTraversalError> {
205        info!("Connecting to bootstrap node at {}", bootstrap_addr);
206
207        // Get the quinn endpoint from NAT traversal endpoint
208        let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
209            NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
210        })?;
211
212        // Connect using the QUIC endpoint directly
213        match endpoint.connect(bootstrap_addr, "bootstrap-node") {
214            Ok(connecting) => {
215                match connecting.await {
216                    Ok(_connection) => {
217                        // Extract peer ID from the connection
218                        // For now, we'll generate a temporary peer ID based on the address
219                        // In a real implementation, we'd exchange peer IDs during the handshake
220                        let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
221
222                        // Store the connection
223                        self.connected_peers
224                            .write()
225                            .await
226                            .insert(peer_id, bootstrap_addr);
227
228                        // Update stats
229                        {
230                            let mut stats = self.stats.lock().await;
231                            stats.active_connections += 1;
232                            stats.successful_connections += 1;
233                        }
234
235                        // Fire connection established event
236                        if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
237                            callback(NatTraversalEvent::ConnectionEstablished {
238                                peer_id,
239                                remote_address: bootstrap_addr,
240                            });
241                        }
242
243                        info!(
244                            "Successfully connected to bootstrap node {} with peer ID {:?}",
245                            bootstrap_addr, peer_id
246                        );
247                        Ok(peer_id)
248                    }
249                    Err(e) => {
250                        error!(
251                            "Failed to establish connection to bootstrap node {}: {}",
252                            bootstrap_addr, e
253                        );
254                        {
255                            let mut stats = self.stats.lock().await;
256                            stats.failed_connections += 1;
257                        }
258                        Err(NatTraversalError::NetworkError(format!(
259                            "Connection failed: {e}"
260                        )))
261                    }
262                }
263            }
264            Err(e) => {
265                error!(
266                    "Failed to initiate connection to bootstrap node {}: {}",
267                    bootstrap_addr, e
268                );
269                {
270                    let mut stats = self.stats.lock().await;
271                    stats.failed_connections += 1;
272                }
273                Err(NatTraversalError::NetworkError(format!(
274                    "Connect error: {e}"
275                )))
276            }
277        }
278    }
279
280    /// Derive a peer ID from a socket address (temporary solution)
281    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
282        use std::collections::hash_map::DefaultHasher;
283        use std::hash::{Hash, Hasher};
284
285        let mut hasher = DefaultHasher::new();
286        addr.hash(&mut hasher);
287        let hash = hasher.finish();
288
289        let mut peer_id_bytes = [0u8; 32];
290        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
291        let port_bytes = addr.port().to_le_bytes();
292        peer_id_bytes[8..10].copy_from_slice(&port_bytes);
293
294        PeerId(peer_id_bytes)
295    }
296
297    /// Connect to a peer using NAT traversal
298    pub async fn connect_to_peer(
299        &self,
300        peer_id: PeerId,
301        coordinator: SocketAddr,
302    ) -> Result<SocketAddr, NatTraversalError> {
303        info!(
304            "Initiating connection to peer {:?} via coordinator {}",
305            peer_id, coordinator
306        );
307
308        // Update stats
309        {
310            let mut stats = self.stats.lock().await;
311            stats.nat_traversal_attempts += 1;
312        }
313
314        // Initiate NAT traversal
315        self.nat_endpoint
316            .initiate_nat_traversal(peer_id, coordinator)?;
317
318        // Poll for completion (in production, this would be event-driven)
319        let start = Instant::now();
320        let timeout = self.config.connection_timeout;
321
322        while start.elapsed() < timeout {
323            let events = self.nat_endpoint.poll(Instant::now())?;
324
325            for event in events {
326                match event {
327                    NatTraversalEvent::ConnectionEstablished {
328                        peer_id: evt_peer,
329                        remote_address,
330                    } => {
331                        if evt_peer == peer_id {
332                            // Store peer connection
333                            {
334                                let mut peers = self.connected_peers.write().await;
335                                peers.insert(peer_id, remote_address);
336                            }
337
338                            // Update stats
339                            {
340                                let mut stats = self.stats.lock().await;
341                                stats.successful_connections += 1;
342                                stats.active_connections += 1;
343                                stats.nat_traversal_successes += 1;
344                            }
345
346                            info!(
347                                "Successfully connected to peer {:?} at {}",
348                                peer_id, remote_address
349                            );
350
351                            // Perform authentication if required
352                            if self.config.auth_config.require_authentication {
353                                match self.authenticate_as_initiator(&peer_id).await {
354                                    Ok(_) => {
355                                        info!("Authentication successful with peer {:?}", peer_id);
356                                    }
357                                    Err(e) => {
358                                        error!(
359                                            "Authentication failed with peer {:?}: {}",
360                                            peer_id, e
361                                        );
362                                        // Remove from connected peers
363                                        self.connected_peers.write().await.remove(&peer_id);
364                                        // Update stats
365                                        let mut stats = self.stats.lock().await;
366                                        stats.active_connections =
367                                            stats.active_connections.saturating_sub(1);
368                                        stats.failed_connections += 1;
369                                        return Err(NatTraversalError::ConfigError(format!(
370                                            "Authentication failed: {e}"
371                                        )));
372                                    }
373                                }
374                            }
375
376                            return Ok(remote_address);
377                        }
378                    }
379                    NatTraversalEvent::TraversalFailed {
380                        peer_id: evt_peer,
381                        error,
382                        fallback_available: _,
383                    } => {
384                        if evt_peer == peer_id {
385                            // Update stats
386                            {
387                                let mut stats = self.stats.lock().await;
388                                stats.failed_connections += 1;
389                            }
390
391                            error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
392                            return Err(error);
393                        }
394                    }
395                    _ => {
396                        debug!("Received event: {:?}", event);
397                    }
398                }
399            }
400
401            // Brief sleep to avoid busy waiting
402            tokio::time::sleep(Duration::from_millis(100)).await;
403        }
404
405        // Timeout
406        {
407            let mut stats = self.stats.lock().await;
408            stats.failed_connections += 1;
409        }
410
411        Err(NatTraversalError::Timeout)
412    }
413
414    /// Accept incoming connections
415    pub async fn accept(
416        &self,
417    ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
418        info!("Waiting for incoming connection...");
419
420        // Accept connection through the NAT traversal endpoint
421        match self.nat_endpoint.accept_connection().await {
422            Ok((peer_id, connection)) => {
423                let remote_addr = connection.remote_address();
424
425                // Store the connection
426                {
427                    let mut peers = self.connected_peers.write().await;
428                    peers.insert(peer_id, remote_addr);
429                }
430
431                // Update stats
432                {
433                    let mut stats = self.stats.lock().await;
434                    stats.successful_connections += 1;
435                    stats.active_connections += 1;
436                }
437
438                info!(
439                    "Accepted connection from peer {:?} at {}",
440                    peer_id, remote_addr
441                );
442
443                // Handle authentication if required
444                if self.config.auth_config.require_authentication {
445                    // Start a task to handle incoming authentication
446                    let self_clone = self.clone();
447                    let auth_peer_id = peer_id;
448                    tokio::spawn(async move {
449                        if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
450                            error!(
451                                "Failed to handle authentication for peer {:?}: {}",
452                                auth_peer_id, e
453                            );
454                            // Remove the peer if auth fails
455                            self_clone
456                                .connected_peers
457                                .write()
458                                .await
459                                .remove(&auth_peer_id);
460                            let mut stats = self_clone.stats.lock().await;
461                            stats.active_connections = stats.active_connections.saturating_sub(1);
462                        }
463                    });
464                }
465
466                Ok((remote_addr, peer_id))
467            }
468            Err(e) => {
469                // Update stats
470                {
471                    let mut stats = self.stats.lock().await;
472                    stats.failed_connections += 1;
473                }
474
475                error!("Failed to accept connection: {}", e);
476                Err(Box::new(e))
477            }
478        }
479    }
480
481    /// Send data to a peer
482    pub async fn send_to_peer(
483        &self,
484        peer_id: &PeerId,
485        data: &[u8],
486    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
487        let peers = self.connected_peers.read().await;
488
489        if let Some(remote_addr) = peers.get(peer_id) {
490            debug!(
491                "Sending {} bytes to peer {:?} at {}",
492                data.len(),
493                peer_id,
494                remote_addr
495            );
496
497            // Get the Quinn connection for this peer from the NAT traversal endpoint
498            match self.nat_endpoint.get_connection(peer_id) {
499                Ok(Some(connection)) => {
500                    // Open a unidirectional stream for data transmission
501                    let mut send_stream = connection
502                        .open_uni()
503                        .await
504                        .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
505
506                    // Send the data
507                    send_stream
508                        .write_all(data)
509                        .await
510                        .map_err(|e| format!("Failed to write data: {e}"))?;
511
512                    // Finish the stream
513                    send_stream
514                        .finish()
515                        .map_err(|e| format!("Failed to finish stream: {e}"))?;
516
517                    debug!(
518                        "Successfully sent {} bytes to peer {:?}",
519                        data.len(),
520                        peer_id
521                    );
522                    Ok(())
523                }
524                Ok(None) => {
525                    error!("No active connection found for peer {:?}", peer_id);
526                    Err("No active connection".into())
527                }
528                Err(e) => {
529                    error!("Failed to get connection for peer {:?}: {}", peer_id, e);
530                    Err(Box::new(e))
531                }
532            }
533        } else {
534            error!("Peer {:?} not connected", peer_id);
535            Err("Peer not connected".into())
536        }
537    }
538
539    /// Receive data from peers
540    pub async fn receive(
541        &self,
542    ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
543        debug!("Waiting to receive data from any connected peer...");
544
545        // Get all connected peers
546        let peers = {
547            let peers_guard = self.connected_peers.read().await;
548            peers_guard.clone()
549        };
550
551        if peers.is_empty() {
552            return Err("No connected peers".into());
553        }
554
555        // Try to receive data from any connected peer
556        // In a real implementation, this would use a more sophisticated approach
557        // like select! over multiple connection streams
558        for (peer_id, _remote_addr) in peers.iter() {
559            match self.nat_endpoint.get_connection(peer_id) {
560                Ok(Some(connection)) => {
561                    // Try to accept incoming unidirectional streams
562                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
563                        .await
564                    {
565                        Ok(Ok(mut recv_stream)) => {
566                            debug!(
567                                "Receiving data from unidirectional stream from peer {:?}",
568                                peer_id
569                            );
570
571                            // Read all data from the stream
572                            match recv_stream.read_to_end(1024 * 1024).await {
573                                // 1MB limit
574                                Ok(buffer) => {
575                                    if !buffer.is_empty() {
576                                        debug!(
577                                            "Received {} bytes from peer {:?}",
578                                            buffer.len(),
579                                            peer_id
580                                        );
581                                        return Ok((*peer_id, buffer));
582                                    }
583                                }
584                                Err(e) => {
585                                    debug!(
586                                        "Failed to read from stream for peer {:?}: {}",
587                                        peer_id, e
588                                    );
589                                }
590                            }
591                        }
592                        Ok(Err(e)) => {
593                            debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
594                        }
595                        Err(_) => {
596                            // Timeout - try bidirectional streams
597                        }
598                    }
599
600                    // Also try to accept bidirectional streams
601                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
602                        .await
603                    {
604                        Ok(Ok((_send_stream, mut recv_stream))) => {
605                            debug!(
606                                "Receiving data from bidirectional stream from peer {:?}",
607                                peer_id
608                            );
609
610                            // Read all data from the receive side
611                            match recv_stream.read_to_end(1024 * 1024).await {
612                                // 1MB limit
613                                Ok(buffer) => {
614                                    if !buffer.is_empty() {
615                                        debug!(
616                                            "Received {} bytes from peer {:?} via bidirectional stream",
617                                            buffer.len(),
618                                            peer_id
619                                        );
620                                        return Ok((*peer_id, buffer));
621                                    }
622                                }
623                                Err(e) => {
624                                    debug!(
625                                        "Failed to read from bidirectional stream for peer {:?}: {}",
626                                        peer_id, e
627                                    );
628                                }
629                            }
630                        }
631                        Ok(Err(e)) => {
632                            debug!(
633                                "Failed to accept bidirectional stream from peer {:?}: {}",
634                                peer_id, e
635                            );
636                        }
637                        Err(_) => {
638                            // Timeout - continue to next peer
639                        }
640                    }
641                }
642                Ok(None) => {
643                    debug!("No active connection for peer {:?}", peer_id);
644                }
645                Err(e) => {
646                    debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
647                }
648            }
649        }
650
651        // If we get here, no data was received from any peer
652        Err("No data available from any connected peer".into())
653    }
654
655    /// Get current statistics
656    pub async fn get_stats(&self) -> NodeStats {
657        self.stats.lock().await.clone()
658    }
659
660    /// Get access to the NAT traversal endpoint
661    pub fn get_nat_endpoint(
662        &self,
663    ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
664        Ok(&*self.nat_endpoint)
665    }
666
667    /// Start periodic statistics reporting
668    pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
669        let stats = Arc::clone(&self.stats);
670        let interval_duration = self.config.stats_interval;
671
672        tokio::spawn(async move {
673            let mut interval = tokio::time::interval(interval_duration);
674
675            loop {
676                interval.tick().await;
677
678                let stats_snapshot = stats.lock().await.clone();
679
680                info!(
681                    "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
682                    stats_snapshot.active_connections,
683                    stats_snapshot.successful_connections,
684                    stats_snapshot.nat_traversal_successes,
685                    stats_snapshot.nat_traversal_attempts
686                );
687            }
688        })
689    }
690
691    /// Get NAT traversal statistics
692    pub async fn get_nat_stats(
693        &self,
694    ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
695        self.nat_endpoint.get_nat_stats()
696    }
697
698    // OBSERVED_ADDRESS integration is handled within the connection; manual injection removed
699
700    /// Get connection metrics for a specific peer
701    pub async fn get_connection_metrics(
702        &self,
703        peer_id: &PeerId,
704    ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
705        match self.nat_endpoint.get_connection(peer_id) {
706            Ok(Some(connection)) => {
707                // Get basic RTT from the connection
708                let rtt = connection.rtt();
709
710                // Get congestion window and other stats
711                let stats = connection.stats();
712
713                Ok(ConnectionMetrics {
714                    bytes_sent: stats.udp_tx.bytes,
715                    bytes_received: stats.udp_rx.bytes,
716                    rtt: Some(rtt),
717                    packet_loss: stats.path.lost_packets as f64
718                        / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
719                })
720            }
721            Ok(None) => Err("Connection not found".into()),
722            Err(e) => Err(format!("Failed to get connection: {e}").into()),
723        }
724    }
725
726    /// Get this node's peer ID
727    pub fn peer_id(&self) -> PeerId {
728        self.peer_id
729    }
730
731    /// Get this node's public key bytes
732    pub fn public_key_bytes(&self) -> [u8; 32] {
733        self.auth_manager.public_key_bytes()
734    }
735
736    /// Send an authentication message to a peer
737    async fn send_auth_message(
738        &self,
739        peer_id: &PeerId,
740        message: AuthMessage,
741    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
742        let data = AuthManager::serialize_message(&message)?;
743        self.send_to_peer(peer_id, &data).await
744    }
745
746    /// Perform authentication handshake as initiator
747    async fn authenticate_as_initiator(
748        &self,
749        peer_id: &PeerId,
750    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
751        info!("Starting authentication with peer {:?}", peer_id);
752
753        // Send authentication request
754        let auth_request = self.auth_manager.create_auth_request();
755        self.send_auth_message(peer_id, auth_request).await?;
756
757        // Wait for challenge
758        let timeout_duration = self.config.auth_config.auth_timeout;
759        let start = Instant::now();
760
761        while start.elapsed() < timeout_duration {
762            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
763                Ok(Ok((recv_peer_id, data))) => {
764                    if recv_peer_id == *peer_id {
765                        match AuthManager::deserialize_message(&data) {
766                            Ok(AuthMessage::Challenge { nonce, .. }) => {
767                                // Create and send challenge response
768                                let response =
769                                    self.auth_manager.create_challenge_response(nonce)?;
770                                self.send_auth_message(peer_id, response).await?;
771                            }
772                            Ok(AuthMessage::AuthSuccess { .. }) => {
773                                info!("Authentication successful with peer {:?}", peer_id);
774                                return Ok(());
775                            }
776                            Ok(AuthMessage::AuthFailure { reason }) => {
777                                return Err(format!("Authentication failed: {reason}").into());
778                            }
779                            _ => continue,
780                        }
781                    }
782                }
783                _ => continue,
784            }
785        }
786
787        Err("Authentication timeout".into())
788    }
789
790    /// Handle incoming authentication messages
791    async fn handle_auth_message(
792        &self,
793        peer_id: PeerId,
794        message: AuthMessage,
795    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
796        let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
797
798        match auth_protocol.handle_message(peer_id, message).await {
799            Ok(Some(response)) => {
800                self.send_auth_message(&peer_id, response).await?;
801            }
802            Ok(None) => {
803                // No response needed
804            }
805            Err(e) => {
806                error!("Authentication error: {}", e);
807                let failure = AuthMessage::AuthFailure {
808                    reason: e.to_string(),
809                };
810                self.send_auth_message(&peer_id, failure).await?;
811                return Err(Box::new(e));
812            }
813        }
814
815        Ok(())
816    }
817
818    /// Check if a peer is authenticated
819    pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
820        self.auth_manager.is_authenticated(peer_id).await
821    }
822
823    /// Get list of authenticated peers
824    pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
825        self.auth_manager.list_authenticated_peers().await
826    }
827
828    /// Handle incoming authentication from a peer
829    async fn handle_incoming_auth(
830        &self,
831        peer_id: PeerId,
832    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
833        info!("Handling incoming authentication from peer {:?}", peer_id);
834
835        let timeout_duration = self.config.auth_config.auth_timeout;
836        let start = Instant::now();
837
838        while start.elapsed() < timeout_duration {
839            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
840                Ok(Ok((recv_peer_id, data))) => {
841                    if recv_peer_id == peer_id {
842                        match AuthManager::deserialize_message(&data) {
843                            Ok(auth_msg) => {
844                                self.handle_auth_message(peer_id, auth_msg).await?;
845
846                                // Check if authentication is complete
847                                if self.auth_manager.is_authenticated(&peer_id).await {
848                                    info!("Peer {:?} successfully authenticated", peer_id);
849                                    return Ok(());
850                                }
851                            }
852                            Err(_) => {
853                                // Not an auth message, ignore
854                                continue;
855                            }
856                        }
857                    }
858                }
859                _ => continue,
860            }
861        }
862
863        Err("Authentication timeout waiting for peer".into())
864    }
865
866    /// Get the metrics collector for Prometheus export
867    pub fn get_metrics_collector(
868        &self,
869    ) -> Result<Arc<crate::logging::MetricsCollector>, &'static str> {
870        // For now, create a new metrics collector
871        // In a full implementation, this would be a field in the struct
872        // and properly wired up to collect actual metrics
873        Ok(Arc::new(crate::logging::MetricsCollector::new()))
874    }
875}