1use std::{
7 collections::HashMap,
8 net::SocketAddr,
9 sync::Arc,
10 time::{Duration, Instant},
11};
12
13use tracing::{debug, info, error};
14
15use crate::{
16 nat_traversal_api::{
17 NatTraversalEndpoint, NatTraversalConfig, NatTraversalEvent,
18 EndpointRole, PeerId, NatTraversalError,
19 },
20};
21
22pub struct QuicP2PNode {
24 nat_endpoint: Arc<NatTraversalEndpoint>,
26 connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
28 stats: Arc<tokio::sync::Mutex<NodeStats>>,
30 config: QuicNodeConfig,
32}
33
34#[derive(Debug, Clone)]
36pub struct QuicNodeConfig {
37 pub role: EndpointRole,
39 pub bootstrap_nodes: Vec<SocketAddr>,
41 pub enable_coordinator: bool,
43 pub max_connections: usize,
45 pub connection_timeout: Duration,
47 pub stats_interval: Duration,
49}
50
51impl Default for QuicNodeConfig {
52 fn default() -> Self {
53 Self {
54 role: EndpointRole::Client,
55 bootstrap_nodes: Vec::new(),
56 enable_coordinator: false,
57 max_connections: 100,
58 connection_timeout: Duration::from_secs(30),
59 stats_interval: Duration::from_secs(30),
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct NodeStats {
67 pub active_connections: usize,
69 pub successful_connections: u64,
71 pub failed_connections: u64,
73 pub nat_traversal_attempts: u64,
75 pub nat_traversal_successes: u64,
77 pub start_time: Instant,
79}
80
81impl Default for NodeStats {
82 fn default() -> Self {
83 Self {
84 active_connections: 0,
85 successful_connections: 0,
86 failed_connections: 0,
87 nat_traversal_attempts: 0,
88 nat_traversal_successes: 0,
89 start_time: Instant::now(),
90 }
91 }
92}
93
94impl QuicP2PNode {
95 pub async fn new(config: QuicNodeConfig) -> Result<Self, Box<dyn std::error::Error>> {
97 let nat_config = NatTraversalConfig {
99 role: config.role,
100 bootstrap_nodes: config.bootstrap_nodes.clone(),
101 max_candidates: 50,
102 coordination_timeout: Duration::from_secs(10),
103 enable_symmetric_nat: true,
104 enable_relay_fallback: true,
105 max_concurrent_attempts: 5,
106 };
107
108 let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
110 start_time: Instant::now(),
111 ..Default::default()
112 }));
113 let stats_for_callback = Arc::clone(&stats_clone);
114
115 let event_callback = Box::new(move |event: NatTraversalEvent| {
116 let stats = stats_for_callback.clone();
117 tokio::spawn(async move {
118 let mut stats = stats.lock().await;
119 match event {
120 NatTraversalEvent::CoordinationRequested { .. } => {
121 stats.nat_traversal_attempts += 1;
122 }
123 NatTraversalEvent::ConnectionEstablished { .. } => {
124 stats.nat_traversal_successes += 1;
125 }
126 _ => {}
127 }
128 });
129 });
130
131 let nat_endpoint = Arc::new(
133 NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?
134 );
135
136 Ok(Self {
137 nat_endpoint,
138 connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
139 stats: stats_clone,
140 config,
141 })
142 }
143
144 pub async fn connect_to_peer(
146 &self,
147 peer_id: PeerId,
148 coordinator: SocketAddr,
149 ) -> Result<SocketAddr, NatTraversalError> {
150 info!("Initiating connection to peer {:?} via coordinator {}", peer_id, coordinator);
151
152 {
154 let mut stats = self.stats.lock().await;
155 stats.nat_traversal_attempts += 1;
156 }
157
158 self.nat_endpoint.initiate_nat_traversal(peer_id, coordinator)?;
160
161 let start = Instant::now();
163 let timeout = self.config.connection_timeout;
164
165 while start.elapsed() < timeout {
166 let events = self.nat_endpoint.poll(Instant::now())?;
167
168 for event in events {
169 match event {
170 NatTraversalEvent::ConnectionEstablished { peer_id: evt_peer, remote_address } => {
171 if evt_peer == peer_id {
172 {
174 let mut peers = self.connected_peers.write().await;
175 peers.insert(peer_id, remote_address);
176 }
177
178 {
180 let mut stats = self.stats.lock().await;
181 stats.successful_connections += 1;
182 stats.active_connections += 1;
183 stats.nat_traversal_successes += 1;
184 }
185
186 info!("Successfully connected to peer {:?} at {}", peer_id, remote_address);
187 return Ok(remote_address);
188 }
189 }
190 NatTraversalEvent::TraversalFailed { peer_id: evt_peer, error, fallback_available: _ } => {
191 if evt_peer == peer_id {
192 {
194 let mut stats = self.stats.lock().await;
195 stats.failed_connections += 1;
196 }
197
198 error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
199 return Err(error);
200 }
201 }
202 _ => {
203 debug!("Received event: {:?}", event);
204 }
205 }
206 }
207
208 tokio::time::sleep(Duration::from_millis(100)).await;
210 }
211
212 {
214 let mut stats = self.stats.lock().await;
215 stats.failed_connections += 1;
216 }
217
218 Err(NatTraversalError::Timeout)
219 }
220
221 pub async fn accept(&self) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error>> {
223 info!("Waiting for incoming connection...");
224
225 match self.nat_endpoint.accept_connection().await {
227 Ok((peer_id, connection)) => {
228 let remote_addr = connection.remote_address();
229
230 {
232 let mut peers = self.connected_peers.write().await;
233 peers.insert(peer_id, remote_addr);
234 }
235
236 {
238 let mut stats = self.stats.lock().await;
239 stats.successful_connections += 1;
240 stats.active_connections += 1;
241 }
242
243 info!("Accepted connection from peer {:?} at {}", peer_id, remote_addr);
244 Ok((remote_addr, peer_id))
245 }
246 Err(e) => {
247 {
249 let mut stats = self.stats.lock().await;
250 stats.failed_connections += 1;
251 }
252
253 error!("Failed to accept connection: {}", e);
254 Err(Box::new(e))
255 }
256 }
257 }
258
259 pub async fn send_to_peer(
261 &self,
262 peer_id: &PeerId,
263 data: &[u8],
264 ) -> Result<(), Box<dyn std::error::Error>> {
265 let peers = self.connected_peers.read().await;
266
267 if let Some(remote_addr) = peers.get(peer_id) {
268 debug!("Sending {} bytes to peer {:?} at {}", data.len(), peer_id, remote_addr);
269
270 match self.nat_endpoint.get_connection(peer_id) {
272 Ok(Some(connection)) => {
273 let mut send_stream = connection.open_uni().await
275 .map_err(|e| format!("Failed to open unidirectional stream: {}", e))?;
276
277 send_stream.write_all(data).await
279 .map_err(|e| format!("Failed to write data: {}", e))?;
280
281 send_stream.finish().map_err(|e| format!("Failed to finish stream: {}", e))?;
283
284 debug!("Successfully sent {} bytes to peer {:?}", data.len(), peer_id);
285 Ok(())
286 }
287 Ok(None) => {
288 error!("No active connection found for peer {:?}", peer_id);
289 Err("No active connection".into())
290 }
291 Err(e) => {
292 error!("Failed to get connection for peer {:?}: {}", peer_id, e);
293 Err(Box::new(e))
294 }
295 }
296 } else {
297 error!("Peer {:?} not connected", peer_id);
298 Err("Peer not connected".into())
299 }
300 }
301
302 pub async fn receive(&self) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error>> {
304 debug!("Waiting to receive data from any connected peer...");
305
306 let peers = {
308 let peers_guard = self.connected_peers.read().await;
309 peers_guard.clone()
310 };
311
312 if peers.is_empty() {
313 return Err("No connected peers".into());
314 }
315
316 for (peer_id, _remote_addr) in peers.iter() {
320 match self.nat_endpoint.get_connection(peer_id) {
321 Ok(Some(connection)) => {
322 match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni()).await {
324 Ok(Ok(mut recv_stream)) => {
325 debug!("Receiving data from unidirectional stream from peer {:?}", peer_id);
326
327 match recv_stream.read_to_end(1024 * 1024).await { Ok(buffer) => {
330 if !buffer.is_empty() {
331 debug!("Received {} bytes from peer {:?}", buffer.len(), peer_id);
332 return Ok((*peer_id, buffer));
333 }
334 }
335 Err(e) => {
336 debug!("Failed to read from stream for peer {:?}: {}", peer_id, e);
337 }
338 }
339 }
340 Ok(Err(e)) => {
341 debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
342 }
343 Err(_) => {
344 }
346 }
347
348 match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi()).await {
350 Ok(Ok((_send_stream, mut recv_stream))) => {
351 debug!("Receiving data from bidirectional stream from peer {:?}", peer_id);
352
353 match recv_stream.read_to_end(1024 * 1024).await { Ok(buffer) => {
356 if !buffer.is_empty() {
357 debug!("Received {} bytes from peer {:?} via bidirectional stream", buffer.len(), peer_id);
358 return Ok((*peer_id, buffer));
359 }
360 }
361 Err(e) => {
362 debug!("Failed to read from bidirectional stream for peer {:?}: {}", peer_id, e);
363 }
364 }
365 }
366 Ok(Err(e)) => {
367 debug!("Failed to accept bidirectional stream from peer {:?}: {}", peer_id, e);
368 }
369 Err(_) => {
370 }
372 }
373 }
374 Ok(None) => {
375 debug!("No active connection for peer {:?}", peer_id);
376 }
377 Err(e) => {
378 debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
379 }
380 }
381 }
382
383 Err("No data available from any connected peer".into())
385 }
386
387 pub async fn get_stats(&self) -> NodeStats {
389 self.stats.lock().await.clone()
390 }
391
392 pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
394 let stats = Arc::clone(&self.stats);
395 let interval_duration = self.config.stats_interval;
396
397 tokio::spawn(async move {
398 let mut interval = tokio::time::interval(interval_duration);
399
400 loop {
401 interval.tick().await;
402
403 let stats_snapshot = stats.lock().await.clone();
404
405 info!(
406 "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
407 stats_snapshot.active_connections,
408 stats_snapshot.successful_connections,
409 stats_snapshot.nat_traversal_successes,
410 stats_snapshot.nat_traversal_attempts
411 );
412 }
413 })
414 }
415}
416