ant_quic/
quic_node.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! QUIC-based P2P node with NAT traversal
9//!
10//! This module provides a QUIC-based implementation of the P2P node
11//! that integrates with the NAT traversal protocol.
12
13use std::{
14    collections::HashMap,
15    net::SocketAddr,
16    sync::{
17        atomic::{AtomicBool, Ordering},
18        Arc,
19    },
20    time::{Duration, Instant},
21};
22
23use tracing::{debug, error, info};
24
25use crate::{
26    auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
27    crypto::raw_public_keys::key_utils::{
28        derive_peer_id_from_public_key, generate_ed25519_keypair,
29    },
30    nat_traversal_api::{
31        EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
32        NatTraversalEvent, NatTraversalStatistics, PeerId,
33    },
34};
35
36/// QUIC-based P2P node with NAT traversal
37#[derive(Clone, Debug)]
38pub struct QuicP2PNode {
39    /// NAT traversal endpoint
40    nat_endpoint: Arc<NatTraversalEndpoint>,
41    /// Active peer connections (maps peer ID to their socket address)
42    connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
43    /// Node statistics
44    stats: Arc<tokio::sync::Mutex<NodeStats>>,
45    /// Node configuration
46    config: QuicNodeConfig,
47    /// Authentication manager
48    auth_manager: Arc<AuthManager>,
49    /// Our peer ID
50    peer_id: PeerId,
51    /// Shutdown signal for graceful termination
52    shutdown: Arc<AtomicBool>,
53}
54
55/// Configuration for QUIC P2P node
56#[derive(Debug, Clone)]
57pub struct QuicNodeConfig {
58    /// Role of this node
59    pub role: EndpointRole,
60    /// Bootstrap nodes
61    pub bootstrap_nodes: Vec<SocketAddr>,
62    /// Enable coordinator services
63    pub enable_coordinator: bool,
64    /// Max concurrent connections
65    pub max_connections: usize,
66    /// Connection timeout
67    pub connection_timeout: Duration,
68    /// Statistics interval
69    pub stats_interval: Duration,
70    /// Authentication configuration
71    pub auth_config: AuthConfig,
72    /// Bind address for the node
73    pub bind_addr: Option<SocketAddr>,
74}
75
76impl Default for QuicNodeConfig {
77    fn default() -> Self {
78        Self {
79            role: EndpointRole::Client,
80            bootstrap_nodes: Vec::new(),
81            enable_coordinator: false,
82            max_connections: 100,
83            connection_timeout: Duration::from_secs(30),
84            stats_interval: Duration::from_secs(30),
85            auth_config: AuthConfig::default(),
86            bind_addr: None,
87        }
88    }
89}
90
91/// Basic per-connection performance metrics
92#[derive(Debug, Clone)]
93pub struct ConnectionMetrics {
94    /// Bytes sent to this peer
95    pub bytes_sent: u64,
96    /// Bytes received from this peer
97    pub bytes_received: u64,
98    /// Round-trip time
99    pub rtt: Option<Duration>,
100    /// Packet loss rate (0.0 to 1.0)
101    pub packet_loss: f64,
102}
103
104/// Aggregate node statistics for monitoring and telemetry
105#[derive(Debug, Clone)]
106pub struct NodeStats {
107    /// Number of active connections
108    pub active_connections: usize,
109    /// Total successful connections
110    pub successful_connections: u64,
111    /// Total failed connections
112    pub failed_connections: u64,
113    /// NAT traversal attempts
114    pub nat_traversal_attempts: u64,
115    /// Successful NAT traversals
116    pub nat_traversal_successes: u64,
117    /// Node start time
118    pub start_time: Instant,
119}
120
121impl Default for NodeStats {
122    fn default() -> Self {
123        Self {
124            active_connections: 0,
125            successful_connections: 0,
126            failed_connections: 0,
127            nat_traversal_attempts: 0,
128            nat_traversal_successes: 0,
129            start_time: Instant::now(),
130        }
131    }
132}
133
134impl QuicP2PNode {
135    /// Create a new QUIC P2P node
136    pub async fn new(
137        config: QuicNodeConfig,
138    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
139        // Generate Ed25519 keypair for authentication
140        let (secret_key, public_key) = generate_ed25519_keypair();
141        let peer_id = derive_peer_id_from_public_key(&public_key);
142
143        info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
144
145        // Create authentication manager
146        let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
147
148        // Create NAT traversal configuration
149        let nat_config = NatTraversalConfig {
150            role: config.role,
151            bootstrap_nodes: config.bootstrap_nodes.clone(),
152            max_candidates: 50,
153            coordination_timeout: Duration::from_secs(10),
154            enable_symmetric_nat: true,
155            // Bootstrap nodes should not enable relay fallback
156            enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
157            max_concurrent_attempts: 5,
158            bind_addr: config.bind_addr,
159            prefer_rfc_nat_traversal: true, // Default to RFC format for standards compliance
160            timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
161        };
162
163        // Create event callback for NAT traversal events
164        let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
165            start_time: Instant::now(),
166            ..Default::default()
167        }));
168        let stats_for_callback = Arc::clone(&stats_clone);
169
170        let event_callback = Box::new(move |event: NatTraversalEvent| {
171            let stats = stats_for_callback.clone();
172            tokio::spawn(async move {
173                let mut stats = stats.lock().await;
174                match event {
175                    NatTraversalEvent::CoordinationRequested { .. } => {
176                        stats.nat_traversal_attempts += 1;
177                    }
178                    NatTraversalEvent::ConnectionEstablished { .. } => {
179                        stats.nat_traversal_successes += 1;
180                    }
181                    _ => {}
182                }
183            });
184        });
185
186        // Create NAT traversal endpoint
187        let nat_endpoint =
188            Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
189
190        // Initialize shutdown signal
191        let shutdown = Arc::new(AtomicBool::new(false));
192
193        // Create the node
194        let node = Self {
195            nat_endpoint,
196            connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
197            stats: stats_clone,
198            config,
199            auth_manager,
200            peer_id,
201            shutdown: shutdown.clone(),
202        };
203
204        Ok(node)
205    }
206
207    /// Get the node configuration
208    pub fn get_config(&self) -> &QuicNodeConfig {
209        &self.config
210    }
211
212    /// Connect directly to a bootstrap node
213    pub async fn connect_to_bootstrap(
214        &self,
215        bootstrap_addr: SocketAddr,
216    ) -> Result<PeerId, NatTraversalError> {
217        info!("Connecting to bootstrap node at {}", bootstrap_addr);
218
219        // Get the quinn endpoint from NAT traversal endpoint
220        let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
221            NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
222        })?;
223
224        // Connect using the QUIC endpoint directly
225        match endpoint.connect(bootstrap_addr, "bootstrap-node") {
226            Ok(connecting) => {
227                match connecting.await {
228                    Ok(connection) => {
229                        // Extract peer ID from the connection
230                        // For now, we'll generate a temporary peer ID based on the address
231                        // In a real implementation, we'd exchange peer IDs during the handshake
232                        let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
233
234                        // Spawn the NAT traversal handler loop so connection lifecycle events are processed
235                        let handler_connection = connection.clone();
236
237                        // Store the connection in NAT endpoint
238                        self.nat_endpoint.add_connection(peer_id, connection)?;
239
240                        self.nat_endpoint
241                            .spawn_connection_handler(peer_id, handler_connection)?;
242
243                        // Store the peer address mapping
244                        self.connected_peers
245                            .write()
246                            .await
247                            .insert(peer_id, bootstrap_addr);
248
249                        // Update stats
250                        {
251                            let mut stats = self.stats.lock().await;
252                            stats.active_connections += 1;
253                            stats.successful_connections += 1;
254                        }
255
256                        // Fire connection established event
257                        if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
258                            callback(NatTraversalEvent::ConnectionEstablished {
259                                peer_id,
260                                remote_address: bootstrap_addr,
261                            });
262                        }
263
264                        info!(
265                            "Successfully connected to bootstrap node {} with peer ID {:?}",
266                            bootstrap_addr, peer_id
267                        );
268                        Ok(peer_id)
269                    }
270                    Err(e) => {
271                        error!(
272                            "Failed to establish connection to bootstrap node {}: {}",
273                            bootstrap_addr, e
274                        );
275                        {
276                            let mut stats = self.stats.lock().await;
277                            stats.failed_connections += 1;
278                        }
279                        Err(NatTraversalError::NetworkError(format!(
280                            "Connection failed: {e}"
281                        )))
282                    }
283                }
284            }
285            Err(e) => {
286                error!(
287                    "Failed to initiate connection to bootstrap node {}: {}",
288                    bootstrap_addr, e
289                );
290                {
291                    let mut stats = self.stats.lock().await;
292                    stats.failed_connections += 1;
293                }
294                Err(NatTraversalError::NetworkError(format!(
295                    "Connect error: {e}"
296                )))
297            }
298        }
299    }
300
301    /// Derive a peer ID from a socket address (temporary solution)
302    fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
303        use std::collections::hash_map::DefaultHasher;
304        use std::hash::{Hash, Hasher};
305
306        let mut hasher = DefaultHasher::new();
307        addr.hash(&mut hasher);
308        let hash = hasher.finish();
309
310        let mut peer_id_bytes = [0u8; 32];
311        peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
312        let port_bytes = addr.port().to_le_bytes();
313        peer_id_bytes[8..10].copy_from_slice(&port_bytes);
314
315        PeerId(peer_id_bytes)
316    }
317
318    /// Connect to a peer using NAT traversal
319    pub async fn connect_to_peer(
320        &self,
321        peer_id: PeerId,
322        coordinator: SocketAddr,
323    ) -> Result<SocketAddr, NatTraversalError> {
324        info!(
325            "Initiating connection to peer {:?} via coordinator {}",
326            peer_id, coordinator
327        );
328
329        // Update stats
330        {
331            let mut stats = self.stats.lock().await;
332            stats.nat_traversal_attempts += 1;
333        }
334
335        // Initiate NAT traversal
336        self.nat_endpoint
337            .initiate_nat_traversal(peer_id, coordinator)?;
338
339        // Poll for completion (in production, this would be event-driven)
340        let start = Instant::now();
341        let timeout = self.config.connection_timeout;
342
343        while start.elapsed() < timeout {
344            let events = self.nat_endpoint.poll(Instant::now())?;
345
346            for event in events {
347                match event {
348                    NatTraversalEvent::ConnectionEstablished {
349                        peer_id: evt_peer,
350                        remote_address,
351                    } => {
352                        if evt_peer == peer_id {
353                            // Store peer connection
354                            {
355                                let mut peers = self.connected_peers.write().await;
356                                peers.insert(peer_id, remote_address);
357                            }
358
359                            // Update stats
360                            {
361                                let mut stats = self.stats.lock().await;
362                                stats.successful_connections += 1;
363                                stats.active_connections += 1;
364                                stats.nat_traversal_successes += 1;
365                            }
366
367                            info!(
368                                "Successfully connected to peer {:?} at {}",
369                                peer_id, remote_address
370                            );
371
372                            // Perform authentication if required
373                            if self.config.auth_config.require_authentication {
374                                match self.authenticate_as_initiator(&peer_id).await {
375                                    Ok(_) => {
376                                        info!("Authentication successful with peer {:?}", peer_id);
377                                    }
378                                    Err(e) => {
379                                        error!(
380                                            "Authentication failed with peer {:?}: {}",
381                                            peer_id, e
382                                        );
383                                        // Remove from connected peers
384                                        self.connected_peers.write().await.remove(&peer_id);
385                                        // Update stats
386                                        let mut stats = self.stats.lock().await;
387                                        stats.active_connections =
388                                            stats.active_connections.saturating_sub(1);
389                                        stats.failed_connections += 1;
390                                        return Err(NatTraversalError::ConfigError(format!(
391                                            "Authentication failed: {e}"
392                                        )));
393                                    }
394                                }
395                            }
396
397                            return Ok(remote_address);
398                        }
399                    }
400                    NatTraversalEvent::TraversalFailed {
401                        peer_id: evt_peer,
402                        error,
403                        fallback_available: _,
404                    } => {
405                        if evt_peer == peer_id {
406                            // Update stats
407                            {
408                                let mut stats = self.stats.lock().await;
409                                stats.failed_connections += 1;
410                            }
411
412                            error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
413                            return Err(error);
414                        }
415                    }
416                    _ => {
417                        debug!("Received event: {:?}", event);
418                    }
419                }
420            }
421
422            // Brief sleep to avoid busy waiting
423            tokio::time::sleep(Duration::from_millis(100)).await;
424        }
425
426        // Timeout
427        {
428            let mut stats = self.stats.lock().await;
429            stats.failed_connections += 1;
430        }
431
432        Err(NatTraversalError::Timeout)
433    }
434
435    /// Accept incoming connections
436    pub async fn accept(
437        &self,
438    ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
439        info!("Waiting for incoming connection...");
440
441        // Accept connection through the NAT traversal endpoint
442        match self.nat_endpoint.accept_connection().await {
443            Ok((peer_id, connection)) => {
444                let remote_addr = connection.remote_address();
445
446                // Spawn connection handler to monitor the connection
447                if let Err(e) = self.nat_endpoint.spawn_connection_handler(peer_id, connection) {
448                    error!(
449                        "Failed to spawn connection handler for peer {:?}: {}",
450                        peer_id, e
451                    );
452                    return Err(Box::new(e));
453                }
454
455                // Store the connection
456                {
457                    let mut peers = self.connected_peers.write().await;
458                    peers.insert(peer_id, remote_addr);
459                }
460
461                // Update stats
462                {
463                    let mut stats = self.stats.lock().await;
464                    stats.successful_connections += 1;
465                    stats.active_connections += 1;
466                }
467
468                info!(
469                    "Accepted connection from peer {:?} at {}",
470                    peer_id, remote_addr
471                );
472
473                // Handle authentication if required
474                if self.config.auth_config.require_authentication {
475                    // Start a task to handle incoming authentication
476                    let self_clone = self.clone();
477                    let auth_peer_id = peer_id;
478                    tokio::spawn(async move {
479                        if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
480                            error!(
481                                "Failed to handle authentication for peer {:?}: {}",
482                                auth_peer_id, e
483                            );
484                            // Remove the peer if auth fails
485                            self_clone
486                                .connected_peers
487                                .write()
488                                .await
489                                .remove(&auth_peer_id);
490                            let mut stats = self_clone.stats.lock().await;
491                            stats.active_connections = stats.active_connections.saturating_sub(1);
492                        }
493                    });
494                }
495
496                Ok((remote_addr, peer_id))
497            }
498            Err(e) => {
499                // Update stats
500                {
501                    let mut stats = self.stats.lock().await;
502                    stats.failed_connections += 1;
503                }
504
505                error!("Failed to accept connection: {}", e);
506                Err(Box::new(e))
507            }
508        }
509    }
510
511    /// Send data to a peer
512    pub async fn send_to_peer(
513        &self,
514        peer_id: &PeerId,
515        data: &[u8],
516    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
517        debug!("Attempting to send {} bytes to peer {:?}", data.len(), peer_id);
518
519        let peers = self.connected_peers.read().await;
520
521        if let Some(remote_addr) = peers.get(peer_id) {
522            debug!("Found peer {:?} at {}", peer_id, remote_addr);
523
524            // Get the Quinn connection for this peer from the NAT traversal endpoint
525            match self.nat_endpoint.get_connection(peer_id) {
526                Ok(Some(connection)) => {
527                    // Open a unidirectional stream for data transmission
528                    let mut send_stream = connection
529                        .open_uni()
530                        .await
531                        .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
532
533                    // Send the data
534                    send_stream
535                        .write_all(data)
536                        .await
537                        .map_err(|e| format!("Failed to write data: {e}"))?;
538
539                    // Finish the stream
540                    send_stream
541                        .finish()
542                        .map_err(|e| format!("Failed to finish stream: {e}"))?;
543
544                    debug!(
545                        "Successfully sent {} bytes to peer {:?}",
546                        data.len(),
547                        peer_id
548                    );
549                    Ok(())
550                }
551                Ok(None) => {
552                    error!("No active connection found for peer {:?}", peer_id);
553                    Err("No active connection".into())
554                }
555                Err(e) => {
556                    error!("Failed to get connection for peer {:?}: {}", peer_id, e);
557                    Err(Box::new(e))
558                }
559            }
560        } else {
561            error!("Peer {:?} not connected", peer_id);
562            Err("Peer not connected".into())
563        }
564    }
565
566    /// Receive data from peers
567    pub async fn receive(
568        &self,
569    ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
570        debug!("Waiting to receive data from any connected peer...");
571
572        // Get all connected peers
573        let peers = {
574            let peers_guard = self.connected_peers.read().await;
575            peers_guard.clone()
576        };
577
578        if peers.is_empty() {
579            return Err("No connected peers".into());
580        }
581
582        // Try to receive data from any connected peer
583        // In a real implementation, this would use a more sophisticated approach
584        // like select! over multiple connection streams
585        for (peer_id, _remote_addr) in peers.iter() {
586            match self.nat_endpoint.get_connection(peer_id) {
587                Ok(Some(connection)) => {
588                    // Try to accept incoming unidirectional streams
589                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
590                        .await
591                    {
592                        Ok(Ok(mut recv_stream)) => {
593                            debug!(
594                                "Receiving data from unidirectional stream from peer {:?}",
595                                peer_id
596                            );
597
598                            // Read all data from the stream
599                            match recv_stream.read_to_end(1024 * 1024).await {
600                                // 1MB limit
601                                Ok(buffer) => {
602                                    if !buffer.is_empty() {
603                                        debug!(
604                                            "Received {} bytes from peer {:?}",
605                                            buffer.len(),
606                                            peer_id
607                                        );
608                                        return Ok((*peer_id, buffer));
609                                    }
610                                }
611                                Err(e) => {
612                                    debug!(
613                                        "Failed to read from stream for peer {:?}: {}",
614                                        peer_id, e
615                                    );
616                                }
617                            }
618                        }
619                        Ok(Err(e)) => {
620                            debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
621                        }
622                        Err(_) => {
623                            // Timeout - try bidirectional streams
624                        }
625                    }
626
627                    // Also try to accept bidirectional streams
628                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
629                        .await
630                    {
631                        Ok(Ok((_send_stream, mut recv_stream))) => {
632                            debug!(
633                                "Receiving data from bidirectional stream from peer {:?}",
634                                peer_id
635                            );
636
637                            // Read all data from the receive side
638                            match recv_stream.read_to_end(1024 * 1024).await {
639                                // 1MB limit
640                                Ok(buffer) => {
641                                    if !buffer.is_empty() {
642                                        debug!(
643                                            "Received {} bytes from peer {:?} via bidirectional stream",
644                                            buffer.len(),
645                                            peer_id
646                                        );
647                                        return Ok((*peer_id, buffer));
648                                    }
649                                }
650                                Err(e) => {
651                                    debug!(
652                                        "Failed to read from bidirectional stream for peer {:?}: {}",
653                                        peer_id, e
654                                    );
655                                }
656                            }
657                        }
658                        Ok(Err(e)) => {
659                            debug!(
660                                "Failed to accept bidirectional stream from peer {:?}: {}",
661                                peer_id, e
662                            );
663                        }
664                        Err(_) => {
665                            // Timeout - continue to next peer
666                        }
667                    }
668                }
669                Ok(None) => {
670                    debug!("No active connection for peer {:?}", peer_id);
671                }
672                Err(e) => {
673                    debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
674                }
675            }
676        }
677
678        // If we get here, no data was received from any peer
679        Err("No data available from any connected peer".into())
680    }
681
682    /// Get current statistics
683    pub async fn get_stats(&self) -> NodeStats {
684        self.stats.lock().await.clone()
685    }
686
687    /// Get access to the NAT traversal endpoint
688    pub fn get_nat_endpoint(
689        &self,
690    ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
691        Ok(&*self.nat_endpoint)
692    }
693
694    /// Start periodic statistics reporting
695    pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
696        let stats = Arc::clone(&self.stats);
697        let interval_duration = self.config.stats_interval;
698
699        tokio::spawn(async move {
700            let mut interval = tokio::time::interval(interval_duration);
701
702            loop {
703                interval.tick().await;
704
705                let stats_snapshot = stats.lock().await.clone();
706
707                info!(
708                    "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
709                    stats_snapshot.active_connections,
710                    stats_snapshot.successful_connections,
711                    stats_snapshot.nat_traversal_successes,
712                    stats_snapshot.nat_traversal_attempts
713                );
714            }
715        })
716    }
717
718    /// Get NAT traversal statistics
719    pub async fn get_nat_stats(
720        &self,
721    ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
722        self.nat_endpoint.get_nat_stats()
723    }
724
725    // OBSERVED_ADDRESS integration is handled within the connection; manual injection removed
726
727    /// Get connection metrics for a specific peer
728    pub async fn get_connection_metrics(
729        &self,
730        peer_id: &PeerId,
731    ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
732        match self.nat_endpoint.get_connection(peer_id) {
733            Ok(Some(connection)) => {
734                // Get basic RTT from the connection
735                let rtt = connection.rtt();
736
737                // Get congestion window and other stats
738                let stats = connection.stats();
739
740                Ok(ConnectionMetrics {
741                    bytes_sent: stats.udp_tx.bytes,
742                    bytes_received: stats.udp_rx.bytes,
743                    rtt: Some(rtt),
744                    packet_loss: stats.path.lost_packets as f64
745                        / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
746                })
747            }
748            Ok(None) => Err("Connection not found".into()),
749            Err(e) => Err(format!("Failed to get connection: {e}").into()),
750        }
751    }
752
753    /// Get this node's peer ID
754    pub fn peer_id(&self) -> PeerId {
755        self.peer_id
756    }
757
758    /// Get this node's public key bytes
759    pub fn public_key_bytes(&self) -> [u8; 32] {
760        self.auth_manager.public_key_bytes()
761    }
762
763    /// Send an authentication message to a peer
764    async fn send_auth_message(
765        &self,
766        peer_id: &PeerId,
767        message: AuthMessage,
768    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
769        let data = AuthManager::serialize_message(&message)?;
770        self.send_to_peer(peer_id, &data).await
771    }
772
773    /// Perform authentication handshake as initiator
774    async fn authenticate_as_initiator(
775        &self,
776        peer_id: &PeerId,
777    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
778        info!("Starting authentication with peer {:?}", peer_id);
779
780        // Send authentication request
781        let auth_request = self.auth_manager.create_auth_request();
782        self.send_auth_message(peer_id, auth_request).await?;
783
784        // Wait for challenge
785        let timeout_duration = self.config.auth_config.auth_timeout;
786        let start = Instant::now();
787
788        while start.elapsed() < timeout_duration {
789            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
790                Ok(Ok((recv_peer_id, data))) => {
791                    if recv_peer_id == *peer_id {
792                        match AuthManager::deserialize_message(&data) {
793                            Ok(AuthMessage::Challenge { nonce, .. }) => {
794                                // Create and send challenge response
795                                let response =
796                                    self.auth_manager.create_challenge_response(nonce)?;
797                                self.send_auth_message(peer_id, response).await?;
798                            }
799                            Ok(AuthMessage::AuthSuccess { .. }) => {
800                                info!("Authentication successful with peer {:?}", peer_id);
801                                return Ok(());
802                            }
803                            Ok(AuthMessage::AuthFailure { reason }) => {
804                                return Err(format!("Authentication failed: {reason}").into());
805                            }
806                            _ => continue,
807                        }
808                    }
809                }
810                _ => continue,
811            }
812        }
813
814        Err("Authentication timeout".into())
815    }
816
817    /// Handle incoming authentication messages
818    async fn handle_auth_message(
819        &self,
820        peer_id: PeerId,
821        message: AuthMessage,
822    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
823        let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
824
825        match auth_protocol.handle_message(peer_id, message).await {
826            Ok(Some(response)) => {
827                self.send_auth_message(&peer_id, response).await?;
828            }
829            Ok(None) => {
830                // No response needed
831            }
832            Err(e) => {
833                error!("Authentication error: {}", e);
834                let failure = AuthMessage::AuthFailure {
835                    reason: e.to_string(),
836                };
837                self.send_auth_message(&peer_id, failure).await?;
838                return Err(Box::new(e));
839            }
840        }
841
842        Ok(())
843    }
844
845    /// Check if a peer is authenticated
846    pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
847        self.auth_manager.is_authenticated(peer_id).await
848    }
849
850    /// Get list of authenticated peers
851    pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
852        self.auth_manager.list_authenticated_peers().await
853    }
854
855    /// Handle incoming authentication from a peer
856    async fn handle_incoming_auth(
857        &self,
858        peer_id: PeerId,
859    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
860        info!("Handling incoming authentication from peer {:?}", peer_id);
861
862        let timeout_duration = self.config.auth_config.auth_timeout;
863        let start = Instant::now();
864
865        while start.elapsed() < timeout_duration {
866            match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
867                Ok(Ok((recv_peer_id, data))) => {
868                    if recv_peer_id == peer_id {
869                        match AuthManager::deserialize_message(&data) {
870                            Ok(auth_msg) => {
871                                self.handle_auth_message(peer_id, auth_msg).await?;
872
873                                // Check if authentication is complete
874                                if self.auth_manager.is_authenticated(&peer_id).await {
875                                    info!("Peer {:?} successfully authenticated", peer_id);
876                                    return Ok(());
877                                }
878                            }
879                            Err(_) => {
880                                // Not an auth message, ignore
881                                continue;
882                            }
883                        }
884                    }
885                }
886                _ => continue,
887            }
888        }
889
890        Err("Authentication timeout waiting for peer".into())
891    }
892
893    /// Get the metrics collector for Prometheus export
894    pub fn get_metrics_collector(
895        &self,
896    ) -> Result<Arc<crate::logging::MetricsCollector>, &'static str> {
897        // For now, create a new metrics collector
898        // In a full implementation, this would be a field in the struct
899        // and properly wired up to collect actual metrics
900        Ok(Arc::new(crate::logging::MetricsCollector::new()))
901    }
902
903    /// Gracefully shutdown the node and close all connections
904    pub fn shutdown(&self) {
905        info!("Shutting down QuicP2PNode");
906        self.shutdown.store(true, Ordering::SeqCst);
907
908        // Close the Quinn endpoint to terminate all connections
909        if let Some(endpoint) = self.nat_endpoint.get_quinn_endpoint() {
910            endpoint.close(0u32.into(), b"node shutdown");
911        }
912    }
913}
914
915/// Automatic cleanup when QuicP2PNode is dropped
916impl Drop for QuicP2PNode {
917    fn drop(&mut self) {
918        self.shutdown();
919    }
920}