ant_quic/
quic_node.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! QUIC-based P2P node with NAT traversal
9//!
10//! This module provides a QUIC-based implementation of the P2P node
11//! that integrates with the NAT traversal protocol.
12
13use std::{
14    collections::HashMap,
15    net::SocketAddr,
16    sync::Arc,
17    time::{Duration, Instant},
18};
19
20use tracing::{debug, error, info};
21
22use crate::{
23    auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
24    crypto::raw_public_keys::key_utils::{
25        derive_peer_id_from_public_key, generate_ed25519_keypair,
26    },
27    nat_traversal_api::{
28        EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
29        NatTraversalEvent, NatTraversalStatistics, PeerId,
30    },
31};
32
33/// QUIC-based P2P node with NAT traversal
34#[derive(Clone, Debug)]
35pub struct QuicP2PNode {
36    /// NAT traversal endpoint
37    nat_endpoint: Arc<NatTraversalEndpoint>,
38    /// Active peer connections (maps peer ID to their socket address)
39    connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
40    /// Node statistics
41    stats: Arc<tokio::sync::Mutex<NodeStats>>,
42    /// Node configuration
43    config: QuicNodeConfig,
44    /// Authentication manager
45    auth_manager: Arc<AuthManager>,
46    /// Our peer ID
47    peer_id: PeerId,
48}
49
50/// Configuration for QUIC P2P node
51#[derive(Debug, Clone)]
52pub struct QuicNodeConfig {
53    /// Role of this node
54    pub role: EndpointRole,
55    /// Bootstrap nodes
56    pub bootstrap_nodes: Vec<SocketAddr>,
57    /// Enable coordinator services
58    pub enable_coordinator: bool,
59    /// Max concurrent connections
60    pub max_connections: usize,
61    /// Connection timeout
62    pub connection_timeout: Duration,
63    /// Statistics interval
64    pub stats_interval: Duration,
65    /// Authentication configuration
66    pub auth_config: AuthConfig,
67    /// Bind address for the node
68    pub bind_addr: Option<SocketAddr>,
69}
70
71impl Default for QuicNodeConfig {
72    fn default() -> Self {
73        Self {
74            role: EndpointRole::Client,
75            bootstrap_nodes: Vec::new(),
76            enable_coordinator: false,
77            max_connections: 100,
78            connection_timeout: Duration::from_secs(30),
79            stats_interval: Duration::from_secs(30),
80            auth_config: AuthConfig::default(),
81            bind_addr: None,
82        }
83    }
84}
85
86/// Basic per-connection performance metrics
87#[derive(Debug, Clone)]
88pub struct ConnectionMetrics {
89    /// Bytes sent to this peer
90    pub bytes_sent: u64,
91    /// Bytes received from this peer
92    pub bytes_received: u64,
93    /// Round-trip time
94    pub rtt: Option<Duration>,
95    /// Packet loss rate (0.0 to 1.0)
96    pub packet_loss: f64,
97}
98
99/// Aggregate node statistics for monitoring and telemetry
100#[derive(Debug, Clone)]
101pub struct NodeStats {
102    /// Number of active connections
103    pub active_connections: usize,
104    /// Total successful connections
105    pub successful_connections: u64,
106    /// Total failed connections
107    pub failed_connections: u64,
108    /// NAT traversal attempts
109    pub nat_traversal_attempts: u64,
110    /// Successful NAT traversals
111    pub nat_traversal_successes: u64,
112    /// Node start time
113    pub start_time: Instant,
114}
115
116impl Default for NodeStats {
117    fn default() -> Self {
118        Self {
119            active_connections: 0,
120            successful_connections: 0,
121            failed_connections: 0,
122            nat_traversal_attempts: 0,
123            nat_traversal_successes: 0,
124            start_time: Instant::now(),
125        }
126    }
127}
128
129impl QuicP2PNode {
130    /// Create a new QUIC P2P node
131    pub async fn new(
132        config: QuicNodeConfig,
133    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
134        // Generate Ed25519 keypair for authentication
135        let (secret_key, public_key) = generate_ed25519_keypair();
136        let peer_id = derive_peer_id_from_public_key(&public_key);
137
138        info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
139
140        // Create authentication manager
141        let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
142
143        // Create NAT traversal configuration
144        let nat_config = NatTraversalConfig {
145            role: config.role,
146            bootstrap_nodes: config.bootstrap_nodes.clone(),
147            max_candidates: 50,
148            coordination_timeout: Duration::from_secs(10),
149            enable_symmetric_nat: true,
150            // Bootstrap nodes should not enable relay fallback
151            enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
152            max_concurrent_attempts: 5,
153            bind_addr: config.bind_addr,
154            prefer_rfc_nat_traversal: true, // Default to RFC format for standards compliance
155            timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
156        };
157
158        // Create event callback for NAT traversal events
159        let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
160            start_time: Instant::now(),
161            ..Default::default()
162        }));
163        let stats_for_callback = Arc::clone(&stats_clone);
164
165        let event_callback = Box::new(move |event: NatTraversalEvent| {
166            let stats = stats_for_callback.clone();
167            tokio::spawn(async move {
168                let mut stats = stats.lock().await;
169                match event {
170                    NatTraversalEvent::CoordinationRequested { .. } => {
171                        stats.nat_traversal_attempts += 1;
172                    }
173                    NatTraversalEvent::ConnectionEstablished { .. } => {
174                        stats.nat_traversal_successes += 1;
175                    }
176                    _ => {}
177                }
178            });
179        });
180
181        // Create NAT traversal endpoint
182        let nat_endpoint =
183            Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
184
185        Ok(Self {
186            nat_endpoint,
187            connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
188            stats: stats_clone,
189            config,
190            auth_manager,
191            peer_id,
192        })
193    }
194
195    /// Get the node configuration
196    pub fn get_config(&self) -> &QuicNodeConfig {
197        &self.config
198    }
199
200    /// Connect directly to a bootstrap node
201    pub async fn connect_to_bootstrap(
202        &self,
203        bootstrap_addr: SocketAddr,
204    ) -> Result<PeerId, NatTraversalError> {
205        info!("Connecting to bootstrap node at {}", bootstrap_addr);
206
207        match self
208            .nat_endpoint
209            .connect_to_address(bootstrap_addr, "bootstrap-node", None)
210            .await
211        {
212            Ok((peer_id, _connection)) => {
213                self.connected_peers
214                    .write()
215                    .await
216                    .insert(peer_id, bootstrap_addr);
217
218                {
219                    let mut stats = self.stats.lock().await;
220                    stats.active_connections += 1;
221                    stats.successful_connections += 1;
222                }
223
224                info!(
225                    "Successfully connected to bootstrap node {} with peer ID {:?}",
226                    bootstrap_addr, peer_id
227                );
228                Ok(peer_id)
229            }
230            Err(e) => {
231                error!(
232                    "Failed to connect to bootstrap node {}: {}",
233                    bootstrap_addr, e
234                );
235                {
236                    let mut stats = self.stats.lock().await;
237                    stats.failed_connections += 1;
238                }
239                Err(e)
240            }
241        }
242    }
243
244    /// Derive a peer ID from a socket address (temporary solution)
245    #[allow(dead_code)]
246    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
247        use std::collections::hash_map::DefaultHasher;
248        use std::hash::{Hash, Hasher};
249
250        let mut hasher = DefaultHasher::new();
251        addr.hash(&mut hasher);
252        let hash = hasher.finish();
253
254        let mut peer_id_bytes = [0u8; 32];
255        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
256        let port_bytes = addr.port().to_le_bytes();
257        peer_id_bytes[8..10].copy_from_slice(&port_bytes);
258
259        PeerId(peer_id_bytes)
260    }
261
262    /// Connect to a peer using NAT traversal
263    pub async fn connect_to_peer(
264        &self,
265        peer_id: PeerId,
266        coordinator: SocketAddr,
267    ) -> Result<SocketAddr, NatTraversalError> {
268        info!(
269            "Initiating connection to peer {:?} via coordinator {}",
270            peer_id, coordinator
271        );
272
273        // Update stats
274        {
275            let mut stats = self.stats.lock().await;
276            stats.nat_traversal_attempts += 1;
277        }
278
279        // Initiate NAT traversal
280        self.nat_endpoint
281            .initiate_nat_traversal(peer_id, coordinator)?;
282
283        // Poll for completion (in production, this would be event-driven)
284        let start = Instant::now();
285        let timeout = self.config.connection_timeout;
286
287        while start.elapsed() < timeout {
288            let events = self.nat_endpoint.poll(Instant::now())?;
289
290            for event in events {
291                match event {
292                    NatTraversalEvent::ConnectionEstablished {
293                        peer_id: evt_peer,
294                        remote_address,
295                    } => {
296                        if evt_peer == peer_id {
297                            // Store peer connection
298                            {
299                                let mut peers = self.connected_peers.write().await;
300                                peers.insert(peer_id, remote_address);
301                            }
302
303                            // Update stats
304                            {
305                                let mut stats = self.stats.lock().await;
306                                stats.successful_connections += 1;
307                                stats.active_connections += 1;
308                                stats.nat_traversal_successes += 1;
309                            }
310
311                            info!(
312                                "Successfully connected to peer {:?} at {}",
313                                peer_id, remote_address
314                            );
315
316                            // Perform authentication if required
317                            if self.config.auth_config.require_authentication {
318                                match self.authenticate_as_initiator(&peer_id).await {
319                                    Ok(_) => {
320                                        info!("Authentication successful with peer {:?}", peer_id);
321                                    }
322                                    Err(e) => {
323                                        error!(
324                                            "Authentication failed with peer {:?}: {}",
325                                            peer_id, e
326                                        );
327                                        // Remove from connected peers
328                                        self.connected_peers.write().await.remove(&peer_id);
329                                        // Update stats
330                                        let mut stats = self.stats.lock().await;
331                                        stats.active_connections =
332                                            stats.active_connections.saturating_sub(1);
333                                        stats.failed_connections += 1;
334                                        return Err(NatTraversalError::ConfigError(format!(
335                                            "Authentication failed: {e}"
336                                        )));
337                                    }
338                                }
339                            }
340
341                            return Ok(remote_address);
342                        }
343                    }
344                    NatTraversalEvent::TraversalFailed {
345                        peer_id: evt_peer,
346                        error,
347                        fallback_available: _,
348                    } => {
349                        if evt_peer == peer_id {
350                            // Update stats
351                            {
352                                let mut stats = self.stats.lock().await;
353                                stats.failed_connections += 1;
354                            }
355
356                            error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
357                            return Err(error);
358                        }
359                    }
360                    _ => {
361                        debug!("Received event: {:?}", event);
362                    }
363                }
364            }
365
366            // Brief sleep to avoid busy waiting
367            tokio::time::sleep(Duration::from_millis(100)).await;
368        }
369
370        // Timeout
371        {
372            let mut stats = self.stats.lock().await;
373            stats.failed_connections += 1;
374        }
375
376        Err(NatTraversalError::Timeout)
377    }
378
379    /// Accept incoming connections
380    pub async fn accept(
381        &self,
382    ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
383        info!("Waiting for incoming connection...");
384
385        // Accept connection through the NAT traversal endpoint
386        match self.nat_endpoint.accept_connection().await {
387            Ok((peer_id, connection)) => {
388                let remote_addr = connection.remote_address();
389
390                // Store the connection
391                {
392                    let mut peers = self.connected_peers.write().await;
393                    peers.insert(peer_id, remote_addr);
394                }
395
396                // Update stats
397                {
398                    let mut stats = self.stats.lock().await;
399                    stats.successful_connections += 1;
400                    stats.active_connections += 1;
401                }
402
403                info!(
404                    "Accepted connection from peer {:?} at {}",
405                    peer_id, remote_addr
406                );
407
408                // Handle authentication if required
409                if self.config.auth_config.require_authentication {
410                    // Start a task to handle incoming authentication
411                    let self_clone = self.clone();
412                    let auth_peer_id = peer_id;
413                    tokio::spawn(async move {
414                        if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
415                            error!(
416                                "Failed to handle authentication for peer {:?}: {}",
417                                auth_peer_id, e
418                            );
419                            // Remove the peer if auth fails
420                            self_clone
421                                .connected_peers
422                                .write()
423                                .await
424                                .remove(&auth_peer_id);
425                            let mut stats = self_clone.stats.lock().await;
426                            stats.active_connections = stats.active_connections.saturating_sub(1);
427                        }
428                    });
429                }
430
431                Ok((remote_addr, peer_id))
432            }
433            Err(e) => {
434                // Update stats
435                {
436                    let mut stats = self.stats.lock().await;
437                    stats.failed_connections += 1;
438                }
439
440                error!("Failed to accept connection: {}", e);
441                Err(Box::new(e))
442            }
443        }
444    }
445
446    /// Send data to a peer
447    pub async fn send_to_peer(
448        &self,
449        peer_id: &PeerId,
450        data: &[u8],
451    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
452        let peers = self.connected_peers.read().await;
453
454        if let Some(remote_addr) = peers.get(peer_id) {
455            debug!(
456                "Sending {} bytes to peer {:?} at {}",
457                data.len(),
458                peer_id,
459                remote_addr
460            );
461
462            // Get the Quinn connection for this peer from the NAT traversal endpoint
463            match self.nat_endpoint.get_connection(peer_id) {
464                Ok(Some(connection)) => {
465                    // Open a unidirectional stream for data transmission
466                    let mut send_stream = connection
467                        .open_uni()
468                        .await
469                        .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
470
471                    // Send the data
472                    send_stream
473                        .write_all(data)
474                        .await
475                        .map_err(|e| format!("Failed to write data: {e}"))?;
476
477                    // Finish the stream
478                    send_stream
479                        .finish()
480                        .map_err(|e| format!("Failed to finish stream: {e}"))?;
481
482                    debug!(
483                        "Successfully sent {} bytes to peer {:?}",
484                        data.len(),
485                        peer_id
486                    );
487                    Ok(())
488                }
489                Ok(None) => {
490                    error!("No active connection found for peer {:?}", peer_id);
491                    Err("No active connection".into())
492                }
493                Err(e) => {
494                    error!("Failed to get connection for peer {:?}: {}", peer_id, e);
495                    Err(Box::new(e))
496                }
497            }
498        } else {
499            error!("Peer {:?} not connected", peer_id);
500            Err("Peer not connected".into())
501        }
502    }
503
504    /// Receive data from peers
505    pub async fn receive(
506        &self,
507    ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
508        debug!("Waiting to receive data from any connected peer...");
509
510        // Get all connected peers
511        let peers = {
512            let peers_guard = self.connected_peers.read().await;
513            peers_guard.clone()
514        };
515
516        if peers.is_empty() {
517            return Err("No connected peers".into());
518        }
519
520        // Try to receive data from any connected peer
521        // In a real implementation, this would use a more sophisticated approach
522        // like select! over multiple connection streams
523        for (peer_id, _remote_addr) in peers.iter() {
524            match self.nat_endpoint.get_connection(peer_id) {
525                Ok(Some(connection)) => {
526                    // Try to accept incoming unidirectional streams
527                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
528                        .await
529                    {
530                        Ok(Ok(mut recv_stream)) => {
531                            debug!(
532                                "Receiving data from unidirectional stream from peer {:?}",
533                                peer_id
534                            );
535
536                            // Read all data from the stream
537                            match recv_stream.read_to_end(1024 * 1024).await {
538                                // 1MB limit
539                                Ok(buffer) => {
540                                    if !buffer.is_empty() {
541                                        debug!(
542                                            "Received {} bytes from peer {:?}",
543                                            buffer.len(),
544                                            peer_id
545                                        );
546                                        return Ok((*peer_id, buffer));
547                                    }
548                                }
549                                Err(e) => {
550                                    debug!(
551                                        "Failed to read from stream for peer {:?}: {}",
552                                        peer_id, e
553                                    );
554                                }
555                            }
556                        }
557                        Ok(Err(e)) => {
558                            debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
559                        }
560                        Err(_) => {
561                            // Timeout - try bidirectional streams
562                        }
563                    }
564
565                    // Also try to accept bidirectional streams
566                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
567                        .await
568                    {
569                        Ok(Ok((_send_stream, mut recv_stream))) => {
570                            debug!(
571                                "Receiving data from bidirectional stream from peer {:?}",
572                                peer_id
573                            );
574
575                            // Read all data from the receive side
576                            match recv_stream.read_to_end(1024 * 1024).await {
577                                // 1MB limit
578                                Ok(buffer) => {
579                                    if !buffer.is_empty() {
580                                        debug!(
581                                            "Received {} bytes from peer {:?} via bidirectional stream",
582                                            buffer.len(),
583                                            peer_id
584                                        );
585                                        return Ok((*peer_id, buffer));
586                                    }
587                                }
588                                Err(e) => {
589                                    debug!(
590                                        "Failed to read from bidirectional stream for peer {:?}: {}",
591                                        peer_id, e
592                                    );
593                                }
594                            }
595                        }
596                        Ok(Err(e)) => {
597                            debug!(
598                                "Failed to accept bidirectional stream from peer {:?}: {}",
599                                peer_id, e
600                            );
601                        }
602                        Err(_) => {
603                            // Timeout - continue to next peer
604                        }
605                    }
606                }
607                Ok(None) => {
608                    debug!("No active connection for peer {:?}", peer_id);
609                }
610                Err(e) => {
611                    debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
612                }
613            }
614        }
615
616        // If we get here, no data was received from any peer
617        Err("No data available from any connected peer".into())
618    }
619
620    /// Get current statistics
621    pub async fn get_stats(&self) -> NodeStats {
622        self.stats.lock().await.clone()
623    }
624
625    /// Get access to the NAT traversal endpoint
626    pub fn get_nat_endpoint(
627        &self,
628    ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
629        Ok(&*self.nat_endpoint)
630    }
631
632    /// Start periodic statistics reporting
633    pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
634        let stats = Arc::clone(&self.stats);
635        let interval_duration = self.config.stats_interval;
636
637        tokio::spawn(async move {
638            let mut interval = tokio::time::interval(interval_duration);
639
640            loop {
641                interval.tick().await;
642
643                let stats_snapshot = stats.lock().await.clone();
644
645                info!(
646                    "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
647                    stats_snapshot.active_connections,
648                    stats_snapshot.successful_connections,
649                    stats_snapshot.nat_traversal_successes,
650                    stats_snapshot.nat_traversal_attempts
651                );
652            }
653        })
654    }
655
656    /// Get NAT traversal statistics
657    pub async fn get_nat_stats(
658        &self,
659    ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
660        self.nat_endpoint.get_nat_stats()
661    }
662
663    // OBSERVED_ADDRESS integration is handled within the connection; manual injection removed
664
665    /// Get connection metrics for a specific peer
666    pub async fn get_connection_metrics(
667        &self,
668        peer_id: &PeerId,
669    ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
670        match self.nat_endpoint.get_connection(peer_id) {
671            Ok(Some(connection)) => {
672                // Get basic RTT from the connection
673                let rtt = connection.rtt();
674
675                // Get congestion window and other stats
676                let stats = connection.stats();
677
678                Ok(ConnectionMetrics {
679                    bytes_sent: stats.udp_tx.bytes,
680                    bytes_received: stats.udp_rx.bytes,
681                    rtt: Some(rtt),
682                    packet_loss: stats.path.lost_packets as f64
683                        / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
684                })
685            }
686            Ok(None) => Err("Connection not found".into()),
687            Err(e) => Err(format!("Failed to get connection: {e}").into()),
688        }
689    }
690
691    /// Get this node's peer ID
692    pub fn peer_id(&self) -> PeerId {
693        self.peer_id
694    }
695
696    /// Get this node's public key bytes
697    pub fn public_key_bytes(&self) -> [u8; 32] {
698        self.auth_manager.public_key_bytes()
699    }
700
701    /// Update internal mappings when a peer reveals its canonical peer ID
702    pub async fn relabel_peer(
703        &self,
704        old_peer_id: PeerId,
705        new_peer_id: PeerId,
706    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
707        if old_peer_id == new_peer_id {
708            return Ok(());
709        }
710
711        self.nat_endpoint
712            .relabel_connection(old_peer_id, new_peer_id)?;
713
714        let mut peers = self.connected_peers.write().await;
715        if let Some(addr) = peers.remove(&old_peer_id) {
716            peers.insert(new_peer_id, addr);
717        }
718
719        Ok(())
720    }
721
722    /// Send an authentication message to a peer
723    async fn send_auth_message(
724        &self,
725        peer_id: &PeerId,
726        message: AuthMessage,
727    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
728        let data = AuthManager::serialize_message(&message)?;
729        self.send_to_peer(peer_id, &data).await
730    }
731
732    /// Perform authentication handshake as initiator
733    async fn authenticate_as_initiator(
734        &self,
735        peer_id: &PeerId,
736    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
737        info!("Starting authentication with peer {:?}", peer_id);
738
739        // Send authentication request
740        let auth_request = self.auth_manager.create_auth_request();
741        self.send_auth_message(peer_id, auth_request).await?;
742
743        // Wait for challenge
744        let timeout_duration = self.config.auth_config.auth_timeout;
745        let start = Instant::now();
746
747        while start.elapsed() < timeout_duration {
748            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
749                Ok(Ok((recv_peer_id, data))) => {
750                    if recv_peer_id == *peer_id {
751                        match AuthManager::deserialize_message(&data) {
752                            Ok(AuthMessage::Challenge { nonce, .. }) => {
753                                // Create and send challenge response
754                                let response =
755                                    self.auth_manager.create_challenge_response(nonce)?;
756                                self.send_auth_message(peer_id, response).await?;
757                            }
758                            Ok(AuthMessage::AuthSuccess { .. }) => {
759                                info!("Authentication successful with peer {:?}", peer_id);
760                                return Ok(());
761                            }
762                            Ok(AuthMessage::AuthFailure { reason }) => {
763                                return Err(format!("Authentication failed: {reason}").into());
764                            }
765                            _ => continue,
766                        }
767                    }
768                }
769                _ => continue,
770            }
771        }
772
773        Err("Authentication timeout".into())
774    }
775
776    /// Handle incoming authentication messages
777    async fn handle_auth_message(
778        &self,
779        peer_id: PeerId,
780        message: AuthMessage,
781    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
782        let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
783
784        match auth_protocol.handle_message(peer_id, message).await {
785            Ok(Some(response)) => {
786                self.send_auth_message(&peer_id, response).await?;
787            }
788            Ok(None) => {
789                // No response needed
790            }
791            Err(e) => {
792                error!("Authentication error: {}", e);
793                let failure = AuthMessage::AuthFailure {
794                    reason: e.to_string(),
795                };
796                self.send_auth_message(&peer_id, failure).await?;
797                return Err(Box::new(e));
798            }
799        }
800
801        Ok(())
802    }
803
804    /// Check if a peer is authenticated
805    pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
806        self.auth_manager.is_authenticated(peer_id).await
807    }
808
809    /// Get list of authenticated peers
810    pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
811        self.auth_manager.list_authenticated_peers().await
812    }
813
814    /// Handle incoming authentication from a peer
815    async fn handle_incoming_auth(
816        &self,
817        peer_id: PeerId,
818    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
819        info!("Handling incoming authentication from peer {:?}", peer_id);
820
821        let timeout_duration = self.config.auth_config.auth_timeout;
822        let start = Instant::now();
823
824        while start.elapsed() < timeout_duration {
825            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
826                Ok(Ok((recv_peer_id, data))) => {
827                    if recv_peer_id == peer_id {
828                        match AuthManager::deserialize_message(&data) {
829                            Ok(auth_msg) => {
830                                self.handle_auth_message(peer_id, auth_msg).await?;
831
832                                // Check if authentication is complete
833                                if self.auth_manager.is_authenticated(&peer_id).await {
834                                    info!("Peer {:?} successfully authenticated", peer_id);
835                                    return Ok(());
836                                }
837                            }
838                            Err(_) => {
839                                // Not an auth message, ignore
840                                continue;
841                            }
842                        }
843                    }
844                }
845                _ => continue,
846            }
847        }
848
849        Err("Authentication timeout waiting for peer".into())
850    }
851
852    /// Get the metrics collector for Prometheus export
853    pub fn get_metrics_collector(
854        &self,
855    ) -> Result<Arc<crate::logging::MetricsCollector>, &'static str> {
856        // For now, create a new metrics collector
857        // In a full implementation, this would be a field in the struct
858        // and properly wired up to collect actual metrics
859        Ok(Arc::new(crate::logging::MetricsCollector::new()))
860    }
861}