ant_quic/
quic_node.rs

1//! QUIC-based P2P node with NAT traversal
2//!
3//! This module provides a QUIC-based implementation of the P2P node
4//! that integrates with the NAT traversal protocol.
5
6use std::{
7    collections::HashMap,
8    net::SocketAddr,
9    sync::Arc,
10    time::{Duration, Instant},
11};
12
13use tracing::{debug, info, error};
14
15use crate::{
16    nat_traversal_api::{
17        NatTraversalEndpoint, NatTraversalConfig, NatTraversalEvent,
18        EndpointRole, PeerId, NatTraversalError,
19    },
20};
21
22/// QUIC-based P2P node with NAT traversal
23pub struct QuicP2PNode {
24    /// NAT traversal endpoint
25    nat_endpoint: Arc<NatTraversalEndpoint>,
26    /// Active peer connections (maps peer ID to their socket address)
27    connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
28    /// Node statistics
29    stats: Arc<tokio::sync::Mutex<NodeStats>>,
30    /// Node configuration
31    config: QuicNodeConfig,
32}
33
34/// Configuration for QUIC P2P node
35#[derive(Debug, Clone)]
36pub struct QuicNodeConfig {
37    /// Role of this node
38    pub role: EndpointRole,
39    /// Bootstrap nodes
40    pub bootstrap_nodes: Vec<SocketAddr>,
41    /// Enable coordinator services
42    pub enable_coordinator: bool,
43    /// Max concurrent connections
44    pub max_connections: usize,
45    /// Connection timeout
46    pub connection_timeout: Duration,
47    /// Statistics interval
48    pub stats_interval: Duration,
49}
50
51impl Default for QuicNodeConfig {
52    fn default() -> Self {
53        Self {
54            role: EndpointRole::Client,
55            bootstrap_nodes: Vec::new(),
56            enable_coordinator: false,
57            max_connections: 100,
58            connection_timeout: Duration::from_secs(30),
59            stats_interval: Duration::from_secs(30),
60        }
61    }
62}
63
64/// Node statistics
65#[derive(Debug, Clone)]
66pub struct NodeStats {
67    /// Number of active connections
68    pub active_connections: usize,
69    /// Total successful connections
70    pub successful_connections: u64,
71    /// Total failed connections
72    pub failed_connections: u64,
73    /// NAT traversal attempts
74    pub nat_traversal_attempts: u64,
75    /// Successful NAT traversals
76    pub nat_traversal_successes: u64,
77    /// Node start time
78    pub start_time: Instant,
79}
80
81impl Default for NodeStats {
82    fn default() -> Self {
83        Self {
84            active_connections: 0,
85            successful_connections: 0,
86            failed_connections: 0,
87            nat_traversal_attempts: 0,
88            nat_traversal_successes: 0,
89            start_time: Instant::now(),
90        }
91    }
92}
93
94impl QuicP2PNode {
95    /// Create a new QUIC P2P node
96    pub async fn new(config: QuicNodeConfig) -> Result<Self, Box<dyn std::error::Error>> {
97        // Create NAT traversal configuration
98        let nat_config = NatTraversalConfig {
99            role: config.role,
100            bootstrap_nodes: config.bootstrap_nodes.clone(),
101            max_candidates: 50,
102            coordination_timeout: Duration::from_secs(10),
103            enable_symmetric_nat: true,
104            enable_relay_fallback: true,
105            max_concurrent_attempts: 5,
106        };
107
108        // Create event callback for NAT traversal events
109        let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
110            start_time: Instant::now(),
111            ..Default::default()
112        }));
113        let stats_for_callback = Arc::clone(&stats_clone);
114        
115        let event_callback = Box::new(move |event: NatTraversalEvent| {
116            let stats = stats_for_callback.clone();
117            tokio::spawn(async move {
118                let mut stats = stats.lock().await;
119                match event {
120                    NatTraversalEvent::CoordinationRequested { .. } => {
121                        stats.nat_traversal_attempts += 1;
122                    }
123                    NatTraversalEvent::ConnectionEstablished { .. } => {
124                        stats.nat_traversal_successes += 1;
125                    }
126                    _ => {}
127                }
128            });
129        });
130
131        // Create NAT traversal endpoint
132        let nat_endpoint = Arc::new(
133            NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?
134        );
135
136        Ok(Self {
137            nat_endpoint,
138            connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
139            stats: stats_clone,
140            config,
141        })
142    }
143
144    /// Connect to a peer using NAT traversal
145    pub async fn connect_to_peer(
146        &self,
147        peer_id: PeerId,
148        coordinator: SocketAddr,
149    ) -> Result<SocketAddr, NatTraversalError> {
150        info!("Initiating connection to peer {:?} via coordinator {}", peer_id, coordinator);
151        
152        // Update stats
153        {
154            let mut stats = self.stats.lock().await;
155            stats.nat_traversal_attempts += 1;
156        }
157
158        // Initiate NAT traversal
159        self.nat_endpoint.initiate_nat_traversal(peer_id, coordinator)?;
160
161        // Poll for completion (in production, this would be event-driven)
162        let start = Instant::now();
163        let timeout = self.config.connection_timeout;
164        
165        while start.elapsed() < timeout {
166            let events = self.nat_endpoint.poll(Instant::now())?;
167            
168            for event in events {
169                match event {
170                    NatTraversalEvent::ConnectionEstablished { peer_id: evt_peer, remote_address } => {
171                        if evt_peer == peer_id {
172                            // Store peer connection
173                            {
174                                let mut peers = self.connected_peers.write().await;
175                                peers.insert(peer_id, remote_address);
176                            }
177                            
178                            // Update stats
179                            {
180                                let mut stats = self.stats.lock().await;
181                                stats.successful_connections += 1;
182                                stats.active_connections += 1;
183                                stats.nat_traversal_successes += 1;
184                            }
185                            
186                            info!("Successfully connected to peer {:?} at {}", peer_id, remote_address);
187                            return Ok(remote_address);
188                        }
189                    }
190                    NatTraversalEvent::TraversalFailed { peer_id: evt_peer, error, fallback_available: _ } => {
191                        if evt_peer == peer_id {
192                            // Update stats
193                            {
194                                let mut stats = self.stats.lock().await;
195                                stats.failed_connections += 1;
196                            }
197                            
198                            error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
199                            return Err(error);
200                        }
201                    }
202                    _ => {
203                        debug!("Received event: {:?}", event);
204                    }
205                }
206            }
207            
208            // Brief sleep to avoid busy waiting
209            tokio::time::sleep(Duration::from_millis(100)).await;
210        }
211        
212        // Timeout
213        {
214            let mut stats = self.stats.lock().await;
215            stats.failed_connections += 1;
216        }
217        
218        Err(NatTraversalError::Timeout)
219    }
220
221    /// Accept incoming connections
222    pub async fn accept(&self) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error>> {
223        info!("Waiting for incoming connection...");
224        
225        // Accept connection through the NAT traversal endpoint
226        match self.nat_endpoint.accept_connection().await {
227            Ok((peer_id, connection)) => {
228                let remote_addr = connection.remote_address();
229                
230                // Store the connection
231                {
232                    let mut peers = self.connected_peers.write().await;
233                    peers.insert(peer_id, remote_addr);
234                }
235                
236                // Update stats
237                {
238                    let mut stats = self.stats.lock().await;
239                    stats.successful_connections += 1;
240                    stats.active_connections += 1;
241                }
242                
243                info!("Accepted connection from peer {:?} at {}", peer_id, remote_addr);
244                Ok((remote_addr, peer_id))
245            }
246            Err(e) => {
247                // Update stats
248                {
249                    let mut stats = self.stats.lock().await;
250                    stats.failed_connections += 1;
251                }
252                
253                error!("Failed to accept connection: {}", e);
254                Err(Box::new(e))
255            }
256        }
257    }
258
259    /// Send data to a peer
260    pub async fn send_to_peer(
261        &self,
262        peer_id: &PeerId,
263        data: &[u8],
264    ) -> Result<(), Box<dyn std::error::Error>> {
265        let peers = self.connected_peers.read().await;
266        
267        if let Some(remote_addr) = peers.get(peer_id) {
268            debug!("Sending {} bytes to peer {:?} at {}", data.len(), peer_id, remote_addr);
269            
270            // Get the Quinn connection for this peer from the NAT traversal endpoint
271            match self.nat_endpoint.get_connection(peer_id) {
272                Ok(Some(connection)) => {
273                    // Open a unidirectional stream for data transmission
274                    let mut send_stream = connection.open_uni().await
275                        .map_err(|e| format!("Failed to open unidirectional stream: {}", e))?;
276                    
277                    // Send the data
278                    send_stream.write_all(data).await
279                        .map_err(|e| format!("Failed to write data: {}", e))?;
280                    
281                    // Finish the stream
282                    send_stream.finish().map_err(|e| format!("Failed to finish stream: {}", e))?;
283                    
284                    debug!("Successfully sent {} bytes to peer {:?}", data.len(), peer_id);
285                    Ok(())
286                }
287                Ok(None) => {
288                    error!("No active connection found for peer {:?}", peer_id);
289                    Err("No active connection".into())
290                }
291                Err(e) => {
292                    error!("Failed to get connection for peer {:?}: {}", peer_id, e);
293                    Err(Box::new(e))
294                }
295            }
296        } else {
297            error!("Peer {:?} not connected", peer_id);
298            Err("Peer not connected".into())
299        }
300    }
301
302    /// Receive data from peers
303    pub async fn receive(&self) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error>> {
304        debug!("Waiting to receive data from any connected peer...");
305        
306        // Get all connected peers
307        let peers = {
308            let peers_guard = self.connected_peers.read().await;
309            peers_guard.clone()
310        };
311        
312        if peers.is_empty() {
313            return Err("No connected peers".into());
314        }
315        
316        // Try to receive data from any connected peer
317        // In a real implementation, this would use a more sophisticated approach
318        // like select! over multiple connection streams
319        for (peer_id, _remote_addr) in peers.iter() {
320            match self.nat_endpoint.get_connection(peer_id) {
321                Ok(Some(connection)) => {
322                    // Try to accept incoming unidirectional streams
323                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni()).await {
324                        Ok(Ok(mut recv_stream)) => {
325                            debug!("Receiving data from unidirectional stream from peer {:?}", peer_id);
326                            
327                            // Read all data from the stream
328                            match recv_stream.read_to_end(1024 * 1024).await { // 1MB limit
329                                Ok(buffer) => {
330                                    if !buffer.is_empty() {
331                                        debug!("Received {} bytes from peer {:?}", buffer.len(), peer_id);
332                                        return Ok((*peer_id, buffer));
333                                    }
334                                }
335                                Err(e) => {
336                                    debug!("Failed to read from stream for peer {:?}: {}", peer_id, e);
337                                }
338                            }
339                        }
340                        Ok(Err(e)) => {
341                            debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
342                        }
343                        Err(_) => {
344                            // Timeout - try bidirectional streams
345                        }
346                    }
347                    
348                    // Also try to accept bidirectional streams
349                    match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi()).await {
350                        Ok(Ok((_send_stream, mut recv_stream))) => {
351                            debug!("Receiving data from bidirectional stream from peer {:?}", peer_id);
352                            
353                            // Read all data from the receive side
354                            match recv_stream.read_to_end(1024 * 1024).await { // 1MB limit
355                                Ok(buffer) => {
356                                    if !buffer.is_empty() {
357                                        debug!("Received {} bytes from peer {:?} via bidirectional stream", buffer.len(), peer_id);
358                                        return Ok((*peer_id, buffer));
359                                    }
360                                }
361                                Err(e) => {
362                                    debug!("Failed to read from bidirectional stream for peer {:?}: {}", peer_id, e);
363                                }
364                            }
365                        }
366                        Ok(Err(e)) => {
367                            debug!("Failed to accept bidirectional stream from peer {:?}: {}", peer_id, e);
368                        }
369                        Err(_) => {
370                            // Timeout - continue to next peer
371                        }
372                    }
373                }
374                Ok(None) => {
375                    debug!("No active connection for peer {:?}", peer_id);
376                }
377                Err(e) => {
378                    debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
379                }
380            }
381        }
382        
383        // If we get here, no data was received from any peer
384        Err("No data available from any connected peer".into())
385    }
386
387    /// Get current statistics
388    pub async fn get_stats(&self) -> NodeStats {
389        self.stats.lock().await.clone()
390    }
391
392    /// Start periodic statistics reporting
393    pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
394        let stats = Arc::clone(&self.stats);
395        let interval_duration = self.config.stats_interval;
396        
397        tokio::spawn(async move {
398            let mut interval = tokio::time::interval(interval_duration);
399            
400            loop {
401                interval.tick().await;
402                
403                let stats_snapshot = stats.lock().await.clone();
404                
405                info!(
406                    "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
407                    stats_snapshot.active_connections,
408                    stats_snapshot.successful_connections,
409                    stats_snapshot.nat_traversal_successes,
410                    stats_snapshot.nat_traversal_attempts
411                );
412            }
413        })
414    }
415}
416