ant_quic/
quic_node.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! QUIC-based P2P node with NAT traversal
9//!
10//! This module provides a QUIC-based implementation of the P2P node
11//! that integrates with the NAT traversal protocol.
12
13use std::{
14    collections::HashMap,
15    net::SocketAddr,
16    sync::{
17        Arc,
18        atomic::{AtomicBool, Ordering},
19    },
20    time::{Duration, Instant},
21};
22
23use tracing::{debug, error, info};
24
25use crate::{
26    auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
27    crypto::raw_public_keys::key_utils::{
28        derive_peer_id_from_public_key, generate_ed25519_keypair,
29    },
30    nat_traversal_api::{
31        EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
32        NatTraversalEvent, NatTraversalStatistics, PeerId,
33    },
34};
35
36/// QUIC-based P2P node with NAT traversal
37#[derive(Clone, Debug)]
38pub struct QuicP2PNode {
39    /// NAT traversal endpoint
40    nat_endpoint: Arc<NatTraversalEndpoint>,
41    /// Active peer connections (maps peer ID to their socket address)
42    connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
43    /// Node statistics
44    stats: Arc<tokio::sync::Mutex<NodeStats>>,
45    /// Node configuration
46    config: QuicNodeConfig,
47    /// Authentication manager
48    auth_manager: Arc<AuthManager>,
49    /// Our peer ID
50    peer_id: PeerId,
51    /// Shutdown signal for graceful termination
52    shutdown: Arc<AtomicBool>,
53}
54
55/// Configuration for QUIC P2P node
56#[derive(Debug, Clone)]
57pub struct QuicNodeConfig {
58    /// Role of this node
59    pub role: EndpointRole,
60    /// Bootstrap nodes
61    pub bootstrap_nodes: Vec<SocketAddr>,
62    /// Enable coordinator services
63    pub enable_coordinator: bool,
64    /// Max concurrent connections
65    pub max_connections: usize,
66    /// Connection timeout
67    pub connection_timeout: Duration,
68    /// Statistics interval
69    pub stats_interval: Duration,
70    /// Authentication configuration
71    pub auth_config: AuthConfig,
72    /// Bind address for the node
73    pub bind_addr: Option<SocketAddr>,
74}
75
76impl Default for QuicNodeConfig {
77    fn default() -> Self {
78        Self {
79            role: EndpointRole::Client,
80            bootstrap_nodes: Vec::new(),
81            enable_coordinator: false,
82            max_connections: 100,
83            connection_timeout: Duration::from_secs(30),
84            stats_interval: Duration::from_secs(30),
85            auth_config: AuthConfig::default(),
86            bind_addr: None,
87        }
88    }
89}
90
91/// Basic per-connection performance metrics
92#[derive(Debug, Clone)]
93pub struct ConnectionMetrics {
94    /// Bytes sent to this peer
95    pub bytes_sent: u64,
96    /// Bytes received from this peer
97    pub bytes_received: u64,
98    /// Round-trip time
99    pub rtt: Option<Duration>,
100    /// Packet loss rate (0.0 to 1.0)
101    pub packet_loss: f64,
102}
103
104/// Aggregate node statistics for monitoring and telemetry
105#[derive(Debug, Clone)]
106pub struct NodeStats {
107    /// Number of active connections
108    pub active_connections: usize,
109    /// Total successful connections
110    pub successful_connections: u64,
111    /// Total failed connections
112    pub failed_connections: u64,
113    /// NAT traversal attempts
114    pub nat_traversal_attempts: u64,
115    /// Successful NAT traversals
116    pub nat_traversal_successes: u64,
117    /// Node start time
118    pub start_time: Instant,
119}
120
121impl Default for NodeStats {
122    fn default() -> Self {
123        Self {
124            active_connections: 0,
125            successful_connections: 0,
126            failed_connections: 0,
127            nat_traversal_attempts: 0,
128            nat_traversal_successes: 0,
129            start_time: Instant::now(),
130        }
131    }
132}
133
134impl QuicP2PNode {
135    /// Create a new QUIC P2P node
136    pub async fn new(
137        config: QuicNodeConfig,
138    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
139        // Generate Ed25519 keypair for authentication
140        let (secret_key, public_key) = generate_ed25519_keypair();
141        let peer_id = derive_peer_id_from_public_key(&public_key);
142
143        info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
144
145        // Create authentication manager
146        let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
147
148        // Create NAT traversal configuration
149        let nat_config = NatTraversalConfig {
150            role: config.role,
151            bootstrap_nodes: config.bootstrap_nodes.clone(),
152            max_candidates: 50,
153            coordination_timeout: Duration::from_secs(10),
154            enable_symmetric_nat: true,
155            // Bootstrap nodes should not enable relay fallback
156            enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
157            max_concurrent_attempts: 5,
158            bind_addr: config.bind_addr,
159            prefer_rfc_nat_traversal: true, // Default to RFC format for standards compliance
160            timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
161        };
162
163        // Create event callback for NAT traversal events
164        let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
165            start_time: Instant::now(),
166            ..Default::default()
167        }));
168        let stats_for_callback = Arc::clone(&stats_clone);
169
170        let event_callback = Box::new(move |event: NatTraversalEvent| {
171            let stats = stats_for_callback.clone();
172            tokio::spawn(async move {
173                let mut stats = stats.lock().await;
174                match event {
175                    NatTraversalEvent::CoordinationRequested { .. } => {
176                        stats.nat_traversal_attempts += 1;
177                    }
178                    NatTraversalEvent::ConnectionEstablished { .. } => {
179                        stats.nat_traversal_successes += 1;
180                    }
181                    _ => {}
182                }
183            });
184        });
185
186        // Create NAT traversal endpoint
187        let nat_endpoint =
188            Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
189
190        // Initialize shutdown signal
191        let shutdown = Arc::new(AtomicBool::new(false));
192
193        // Create the node
194        let node = Self {
195            nat_endpoint,
196            connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
197            stats: stats_clone,
198            config,
199            auth_manager,
200            peer_id,
201            shutdown: shutdown.clone(),
202        };
203
204        Ok(node)
205    }
206
207    /// Get the node configuration
208    pub fn get_config(&self) -> &QuicNodeConfig {
209        &self.config
210    }
211
212    /// Connect directly to a bootstrap node
213    pub async fn connect_to_bootstrap(
214        &self,
215        bootstrap_addr: SocketAddr,
216    ) -> Result<PeerId, NatTraversalError> {
217        info!("Connecting to bootstrap node at {}", bootstrap_addr);
218
219        // Get the quinn endpoint from NAT traversal endpoint
220        let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
221            NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
222        })?;
223
224        // Connect using the QUIC endpoint directly
225        match endpoint.connect(bootstrap_addr, "bootstrap-node") {
226            Ok(connecting) => {
227                match connecting.await {
228                    Ok(connection) => {
229                        // Extract peer ID from the connection
230                        // For now, we'll generate a temporary peer ID based on the address
231                        // In a real implementation, we'd exchange peer IDs during the handshake
232                        let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
233
234                        // Spawn the NAT traversal handler loop so connection lifecycle events are processed
235                        let handler_connection = connection.clone();
236
237                        // Store the connection in NAT endpoint
238                        self.nat_endpoint.add_connection(peer_id, connection)?;
239
240                        self.nat_endpoint
241                            .spawn_connection_handler(peer_id, handler_connection)?;
242
243                        // Store the peer address mapping
244                        self.connected_peers
245                            .write()
246                            .await
247                            .insert(peer_id, bootstrap_addr);
248
249                        // Update stats
250                        {
251                            let mut stats = self.stats.lock().await;
252                            stats.active_connections += 1;
253                            stats.successful_connections += 1;
254                        }
255
256                        // Fire connection established event
257                        if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
258                            callback(NatTraversalEvent::ConnectionEstablished {
259                                peer_id,
260                                remote_address: bootstrap_addr,
261                            });
262                        }
263
264                        info!(
265                            "Successfully connected to bootstrap node {} with peer ID {:?}",
266                            bootstrap_addr, peer_id
267                        );
268                        Ok(peer_id)
269                    }
270                    Err(e) => {
271                        error!(
272                            "Failed to establish connection to bootstrap node {}: {}",
273                            bootstrap_addr, e
274                        );
275                        {
276                            let mut stats = self.stats.lock().await;
277                            stats.failed_connections += 1;
278                        }
279                        Err(NatTraversalError::NetworkError(format!(
280                            "Connection failed: {e}"
281                        )))
282                    }
283                }
284            }
285            Err(e) => {
286                error!(
287                    "Failed to initiate connection to bootstrap node {}: {}",
288                    bootstrap_addr, e
289                );
290                {
291                    let mut stats = self.stats.lock().await;
292                    stats.failed_connections += 1;
293                }
294                Err(NatTraversalError::NetworkError(format!(
295                    "Connect error: {e}"
296                )))
297            }
298        }
299    }
300
301    /// Derive a peer ID from a socket address (temporary solution)
302    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
303        use std::collections::hash_map::DefaultHasher;
304        use std::hash::{Hash, Hasher};
305
306        let mut hasher = DefaultHasher::new();
307        addr.hash(&mut hasher);
308        let hash = hasher.finish();
309
310        let mut peer_id_bytes = [0u8; 32];
311        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
312        let port_bytes = addr.port().to_le_bytes();
313        peer_id_bytes[8..10].copy_from_slice(&port_bytes);
314
315        PeerId(peer_id_bytes)
316    }
317
318    /// Connect to a peer using NAT traversal
319    pub async fn connect_to_peer(
320        &self,
321        peer_id: PeerId,
322        coordinator: SocketAddr,
323    ) -> Result<SocketAddr, NatTraversalError> {
324        info!(
325            "Initiating connection to peer {:?} via coordinator {}",
326            peer_id, coordinator
327        );
328
329        // Update stats
330        {
331            let mut stats = self.stats.lock().await;
332            stats.nat_traversal_attempts += 1;
333        }
334
335        // Initiate NAT traversal
336        self.nat_endpoint
337            .initiate_nat_traversal(peer_id, coordinator)?;
338
339        // Poll for completion (in production, this would be event-driven)
340        let start = Instant::now();
341        let timeout = self.config.connection_timeout;
342
343        while start.elapsed() < timeout {
344            let events = self.nat_endpoint.poll(Instant::now())?;
345
346            for event in events {
347                match event {
348                    NatTraversalEvent::ConnectionEstablished {
349                        peer_id: evt_peer,
350                        remote_address,
351                    } => {
352                        if evt_peer == peer_id {
353                            // Store peer connection
354                            {
355                                let mut peers = self.connected_peers.write().await;
356                                peers.insert(peer_id, remote_address);
357                            }
358
359                            // Update stats
360                            {
361                                let mut stats = self.stats.lock().await;
362                                stats.successful_connections += 1;
363                                stats.active_connections += 1;
364                                stats.nat_traversal_successes += 1;
365                            }
366
367                            info!(
368                                "Successfully connected to peer {:?} at {}",
369                                peer_id, remote_address
370                            );
371
372                            // Perform authentication if required
373                            if self.config.auth_config.require_authentication {
374                                match self.authenticate_as_initiator(&peer_id).await {
375                                    Ok(_) => {
376                                        info!("Authentication successful with peer {:?}", peer_id);
377                                    }
378                                    Err(e) => {
379                                        error!(
380                                            "Authentication failed with peer {:?}: {}",
381                                            peer_id, e
382                                        );
383                                        // Remove from connected peers
384                                        self.connected_peers.write().await.remove(&peer_id);
385                                        // Update stats
386                                        let mut stats = self.stats.lock().await;
387                                        stats.active_connections =
388                                            stats.active_connections.saturating_sub(1);
389                                        stats.failed_connections += 1;
390                                        return Err(NatTraversalError::ConfigError(format!(
391                                            "Authentication failed: {e}"
392                                        )));
393                                    }
394                                }
395                            }
396
397                            return Ok(remote_address);
398                        }
399                    }
400                    NatTraversalEvent::TraversalFailed {
401                        peer_id: evt_peer,
402                        error,
403                        fallback_available: _,
404                    } => {
405                        if evt_peer == peer_id {
406                            // Update stats
407                            {
408                                let mut stats = self.stats.lock().await;
409                                stats.failed_connections += 1;
410                            }
411
412                            error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
413                            return Err(error);
414                        }
415                    }
416                    _ => {
417                        debug!("Received event: {:?}", event);
418                    }
419                }
420            }
421
422            // Brief sleep to avoid busy waiting
423            tokio::time::sleep(Duration::from_millis(100)).await;
424        }
425
426        // Timeout
427        {
428            let mut stats = self.stats.lock().await;
429            stats.failed_connections += 1;
430        }
431
432        Err(NatTraversalError::Timeout)
433    }
434
435    /// Accept incoming connections
436    pub async fn accept(
437        &self,
438    ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
439        info!("Waiting for incoming connection...");
440
441        // Accept connection through the NAT traversal endpoint
442        match self.nat_endpoint.accept_connection().await {
443            Ok((peer_id, connection)) => {
444                let remote_addr = connection.remote_address();
445
446                // Spawn connection handler to monitor the connection
447                if let Err(e) = self
448                    .nat_endpoint
449                    .spawn_connection_handler(peer_id, connection)
450                {
451                    error!(
452                        "Failed to spawn connection handler for peer {:?}: {}",
453                        peer_id, e
454                    );
455                    return Err(Box::new(e));
456                }
457
458                // Store the connection
459                {
460                    let mut peers = self.connected_peers.write().await;
461                    peers.insert(peer_id, remote_addr);
462                }
463
464                // Update stats
465                {
466                    let mut stats = self.stats.lock().await;
467                    stats.successful_connections += 1;
468                    stats.active_connections += 1;
469                }
470
471                info!(
472                    "Accepted connection from peer {:?} at {}",
473                    peer_id, remote_addr
474                );
475
476                // Handle authentication if required
477                if self.config.auth_config.require_authentication {
478                    // Start a task to handle incoming authentication
479                    let self_clone = self.clone();
480                    let auth_peer_id = peer_id;
481                    tokio::spawn(async move {
482                        if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
483                            error!(
484                                "Failed to handle authentication for peer {:?}: {}",
485                                auth_peer_id, e
486                            );
487                            // Remove the peer if auth fails
488                            self_clone
489                                .connected_peers
490                                .write()
491                                .await
492                                .remove(&auth_peer_id);
493                            let mut stats = self_clone.stats.lock().await;
494                            stats.active_connections = stats.active_connections.saturating_sub(1);
495                        }
496                    });
497                }
498
499                Ok((remote_addr, peer_id))
500            }
501            Err(e) => {
502                // Update stats
503                {
504                    let mut stats = self.stats.lock().await;
505                    stats.failed_connections += 1;
506                }
507
508                error!("Failed to accept connection: {}", e);
509                Err(Box::new(e))
510            }
511        }
512    }
513
514    /// Send data to a peer
515    pub async fn send_to_peer(
516        &self,
517        peer_id: &PeerId,
518        data: &[u8],
519    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
520        debug!(
521            "Attempting to send {} bytes to peer {:?}",
522            data.len(),
523            peer_id
524        );
525
526        let peers = self.connected_peers.read().await;
527
528        if let Some(remote_addr) = peers.get(peer_id) {
529            debug!("Found peer {:?} at {}", peer_id, remote_addr);
530
531            // Get the Quinn connection for this peer from the NAT traversal endpoint
532            match self.nat_endpoint.get_connection(peer_id) {
533                Ok(Some(connection)) => {
534                    // Open a unidirectional stream for data transmission
535                    let mut send_stream = connection
536                        .open_uni()
537                        .await
538                        .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
539
540                    // Send the data
541                    send_stream
542                        .write_all(data)
543                        .await
544                        .map_err(|e| format!("Failed to write data: {e}"))?;
545
546                    // Finish the stream
547                    send_stream
548                        .finish()
549                        .map_err(|e| format!("Failed to finish stream: {e}"))?;
550
551                    debug!(
552                        "Successfully sent {} bytes to peer {:?}",
553                        data.len(),
554                        peer_id
555                    );
556                    Ok(())
557                }
558                Ok(None) => {
559                    error!("No active connection found for peer {:?}", peer_id);
560                    Err("No active connection".into())
561                }
562                Err(e) => {
563                    error!("Failed to get connection for peer {:?}: {}", peer_id, e);
564                    Err(Box::new(e))
565                }
566            }
567        } else {
568            error!("Peer {:?} not connected", peer_id);
569            Err("Peer not connected".into())
570        }
571    }
572
573    /// Receive data from peers
574    pub async fn receive(
575        &self,
576    ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
577        debug!("Waiting to receive data from any connected peer...");
578
579        // Get all connected peers
580        let peers = {
581            let peers_guard = self.connected_peers.read().await;
582            peers_guard.clone()
583        };
584
585        if peers.is_empty() {
586            return Err("No connected peers".into());
587        }
588
589        // Try to receive data from any connected peer
590        // In a real implementation, this would use a more sophisticated approach
591        // like select! over multiple connection streams
592        for (peer_id, _remote_addr) in peers.iter() {
593            match self.nat_endpoint.get_connection(peer_id) {
594                Ok(Some(connection)) => {
595                    // Try to accept incoming unidirectional streams
596                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
597                        .await
598                    {
599                        Ok(Ok(mut recv_stream)) => {
600                            debug!(
601                                "Receiving data from unidirectional stream from peer {:?}",
602                                peer_id
603                            );
604
605                            // Read all data from the stream
606                            match recv_stream.read_to_end(1024 * 1024).await {
607                                // 1MB limit
608                                Ok(buffer) => {
609                                    if !buffer.is_empty() {
610                                        debug!(
611                                            "Received {} bytes from peer {:?}",
612                                            buffer.len(),
613                                            peer_id
614                                        );
615                                        return Ok((*peer_id, buffer));
616                                    }
617                                }
618                                Err(e) => {
619                                    debug!(
620                                        "Failed to read from stream for peer {:?}: {}",
621                                        peer_id, e
622                                    );
623                                }
624                            }
625                        }
626                        Ok(Err(e)) => {
627                            debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
628                        }
629                        Err(_) => {
630                            // Timeout - try bidirectional streams
631                        }
632                    }
633
634                    // Also try to accept bidirectional streams
635                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
636                        .await
637                    {
638                        Ok(Ok((_send_stream, mut recv_stream))) => {
639                            debug!(
640                                "Receiving data from bidirectional stream from peer {:?}",
641                                peer_id
642                            );
643
644                            // Read all data from the receive side
645                            match recv_stream.read_to_end(1024 * 1024).await {
646                                // 1MB limit
647                                Ok(buffer) => {
648                                    if !buffer.is_empty() {
649                                        debug!(
650                                            "Received {} bytes from peer {:?} via bidirectional stream",
651                                            buffer.len(),
652                                            peer_id
653                                        );
654                                        return Ok((*peer_id, buffer));
655                                    }
656                                }
657                                Err(e) => {
658                                    debug!(
659                                        "Failed to read from bidirectional stream for peer {:?}: {}",
660                                        peer_id, e
661                                    );
662                                }
663                            }
664                        }
665                        Ok(Err(e)) => {
666                            debug!(
667                                "Failed to accept bidirectional stream from peer {:?}: {}",
668                                peer_id, e
669                            );
670                        }
671                        Err(_) => {
672                            // Timeout - continue to next peer
673                        }
674                    }
675                }
676                Ok(None) => {
677                    debug!("No active connection for peer {:?}", peer_id);
678                }
679                Err(e) => {
680                    debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
681                }
682            }
683        }
684
685        // If we get here, no data was received from any peer
686        Err("No data available from any connected peer".into())
687    }
688
689    /// Get current statistics
690    pub async fn get_stats(&self) -> NodeStats {
691        self.stats.lock().await.clone()
692    }
693
694    /// Get access to the NAT traversal endpoint
695    pub fn get_nat_endpoint(
696        &self,
697    ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
698        Ok(&*self.nat_endpoint)
699    }
700
701    /// Start periodic statistics reporting
702    pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
703        let stats = Arc::clone(&self.stats);
704        let interval_duration = self.config.stats_interval;
705
706        tokio::spawn(async move {
707            let mut interval = tokio::time::interval(interval_duration);
708
709            loop {
710                interval.tick().await;
711
712                let stats_snapshot = stats.lock().await.clone();
713
714                info!(
715                    "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
716                    stats_snapshot.active_connections,
717                    stats_snapshot.successful_connections,
718                    stats_snapshot.nat_traversal_successes,
719                    stats_snapshot.nat_traversal_attempts
720                );
721            }
722        })
723    }
724
725    /// Get NAT traversal statistics
726    pub async fn get_nat_stats(
727        &self,
728    ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
729        self.nat_endpoint.get_nat_stats()
730    }
731
732    // OBSERVED_ADDRESS integration is handled within the connection; manual injection removed
733
734    /// Get connection metrics for a specific peer
735    pub async fn get_connection_metrics(
736        &self,
737        peer_id: &PeerId,
738    ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
739        match self.nat_endpoint.get_connection(peer_id) {
740            Ok(Some(connection)) => {
741                // Get basic RTT from the connection
742                let rtt = connection.rtt();
743
744                // Get congestion window and other stats
745                let stats = connection.stats();
746
747                Ok(ConnectionMetrics {
748                    bytes_sent: stats.udp_tx.bytes,
749                    bytes_received: stats.udp_rx.bytes,
750                    rtt: Some(rtt),
751                    packet_loss: stats.path.lost_packets as f64
752                        / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
753                })
754            }
755            Ok(None) => Err("Connection not found".into()),
756            Err(e) => Err(format!("Failed to get connection: {e}").into()),
757        }
758    }
759
760    /// Get this node's peer ID
761    pub fn peer_id(&self) -> PeerId {
762        self.peer_id
763    }
764
765    /// Get this node's public key bytes
766    pub fn public_key_bytes(&self) -> [u8; 32] {
767        self.auth_manager.public_key_bytes()
768    }
769
770    /// Send an authentication message to a peer
771    async fn send_auth_message(
772        &self,
773        peer_id: &PeerId,
774        message: AuthMessage,
775    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
776        let data = AuthManager::serialize_message(&message)?;
777        self.send_to_peer(peer_id, &data).await
778    }
779
780    /// Perform authentication handshake as initiator
781    async fn authenticate_as_initiator(
782        &self,
783        peer_id: &PeerId,
784    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
785        info!("Starting authentication with peer {:?}", peer_id);
786
787        // Send authentication request
788        let auth_request = self.auth_manager.create_auth_request();
789        self.send_auth_message(peer_id, auth_request).await?;
790
791        // Wait for challenge
792        let timeout_duration = self.config.auth_config.auth_timeout;
793        let start = Instant::now();
794
795        while start.elapsed() < timeout_duration {
796            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
797                Ok(Ok((recv_peer_id, data))) => {
798                    if recv_peer_id == *peer_id {
799                        match AuthManager::deserialize_message(&data) {
800                            Ok(AuthMessage::Challenge { nonce, .. }) => {
801                                // Create and send challenge response
802                                let response =
803                                    self.auth_manager.create_challenge_response(nonce)?;
804                                self.send_auth_message(peer_id, response).await?;
805                            }
806                            Ok(AuthMessage::AuthSuccess { .. }) => {
807                                info!("Authentication successful with peer {:?}", peer_id);
808                                return Ok(());
809                            }
810                            Ok(AuthMessage::AuthFailure { reason }) => {
811                                return Err(format!("Authentication failed: {reason}").into());
812                            }
813                            _ => continue,
814                        }
815                    }
816                }
817                _ => continue,
818            }
819        }
820
821        Err("Authentication timeout".into())
822    }
823
824    /// Handle incoming authentication messages
825    async fn handle_auth_message(
826        &self,
827        peer_id: PeerId,
828        message: AuthMessage,
829    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
830        let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
831
832        match auth_protocol.handle_message(peer_id, message).await {
833            Ok(Some(response)) => {
834                self.send_auth_message(&peer_id, response).await?;
835            }
836            Ok(None) => {
837                // No response needed
838            }
839            Err(e) => {
840                error!("Authentication error: {}", e);
841                let failure = AuthMessage::AuthFailure {
842                    reason: e.to_string(),
843                };
844                self.send_auth_message(&peer_id, failure).await?;
845                return Err(Box::new(e));
846            }
847        }
848
849        Ok(())
850    }
851
852    /// Check if a peer is authenticated
853    pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
854        self.auth_manager.is_authenticated(peer_id).await
855    }
856
857    /// Get list of authenticated peers
858    pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
859        self.auth_manager.list_authenticated_peers().await
860    }
861
862    /// Handle incoming authentication from a peer
863    async fn handle_incoming_auth(
864        &self,
865        peer_id: PeerId,
866    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
867        info!("Handling incoming authentication from peer {:?}", peer_id);
868
869        let timeout_duration = self.config.auth_config.auth_timeout;
870        let start = Instant::now();
871
872        while start.elapsed() < timeout_duration {
873            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
874                Ok(Ok((recv_peer_id, data))) => {
875                    if recv_peer_id == peer_id {
876                        match AuthManager::deserialize_message(&data) {
877                            Ok(auth_msg) => {
878                                self.handle_auth_message(peer_id, auth_msg).await?;
879
880                                // Check if authentication is complete
881                                if self.auth_manager.is_authenticated(&peer_id).await {
882                                    info!("Peer {:?} successfully authenticated", peer_id);
883                                    return Ok(());
884                                }
885                            }
886                            Err(_) => {
887                                // Not an auth message, ignore
888                                continue;
889                            }
890                        }
891                    }
892                }
893                _ => continue,
894            }
895        }
896
897        Err("Authentication timeout waiting for peer".into())
898    }
899
900    /// Get the metrics collector for Prometheus export
901    pub fn get_metrics_collector(
902        &self,
903    ) -> Result<Arc<crate::logging::MetricsCollector>, &'static str> {
904        // For now, create a new metrics collector
905        // In a full implementation, this would be a field in the struct
906        // and properly wired up to collect actual metrics
907        Ok(Arc::new(crate::logging::MetricsCollector::new()))
908    }
909
910    /// Gracefully shutdown the node and close all connections
911    pub fn shutdown(&self) {
912        info!("Shutting down QuicP2PNode");
913        self.shutdown.store(true, Ordering::SeqCst);
914
915        // Close the Quinn endpoint to terminate all connections
916        if let Some(endpoint) = self.nat_endpoint.get_quinn_endpoint() {
917            endpoint.close(0u32.into(), b"node shutdown");
918        }
919    }
920}
921
922/// Automatic cleanup when QuicP2PNode is dropped
923impl Drop for QuicP2PNode {
924    fn drop(&mut self) {
925        self.shutdown();
926    }
927}