1use std::{
7 collections::HashMap,
8 net::SocketAddr,
9 sync::Arc,
10 time::{Duration, Instant},
11};
12
13use tracing::{debug, info, error};
14
15use crate::{
16 nat_traversal_api::{
17 NatTraversalEndpoint, NatTraversalConfig, NatTraversalEvent,
18 EndpointRole, PeerId, NatTraversalError, NatTraversalStatistics,
19 },
20 auth::{AuthManager, AuthConfig, AuthMessage, AuthProtocol},
21 crypto::raw_public_keys::key_utils::{
22 generate_ed25519_keypair, derive_peer_id_from_public_key,
23 },
24};
25
26#[derive(Clone)]
28pub struct QuicP2PNode {
29 nat_endpoint: Arc<NatTraversalEndpoint>,
31 connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
33 stats: Arc<tokio::sync::Mutex<NodeStats>>,
35 config: QuicNodeConfig,
37 auth_manager: Arc<AuthManager>,
39 peer_id: PeerId,
41}
42
43#[derive(Debug, Clone)]
45pub struct QuicNodeConfig {
46 pub role: EndpointRole,
48 pub bootstrap_nodes: Vec<SocketAddr>,
50 pub enable_coordinator: bool,
52 pub max_connections: usize,
54 pub connection_timeout: Duration,
56 pub stats_interval: Duration,
58 pub auth_config: AuthConfig,
60 pub bind_addr: Option<SocketAddr>,
62}
63
64impl Default for QuicNodeConfig {
65 fn default() -> Self {
66 Self {
67 role: EndpointRole::Client,
68 bootstrap_nodes: Vec::new(),
69 enable_coordinator: false,
70 max_connections: 100,
71 connection_timeout: Duration::from_secs(30),
72 stats_interval: Duration::from_secs(30),
73 auth_config: AuthConfig::default(),
74 bind_addr: None,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct ConnectionMetrics {
82 pub bytes_sent: u64,
84 pub bytes_received: u64,
86 pub rtt: Option<Duration>,
88 pub packet_loss: f64,
90}
91
92#[derive(Debug, Clone)]
94pub struct NodeStats {
95 pub active_connections: usize,
97 pub successful_connections: u64,
99 pub failed_connections: u64,
101 pub nat_traversal_attempts: u64,
103 pub nat_traversal_successes: u64,
105 pub start_time: Instant,
107}
108
109impl Default for NodeStats {
110 fn default() -> Self {
111 Self {
112 active_connections: 0,
113 successful_connections: 0,
114 failed_connections: 0,
115 nat_traversal_attempts: 0,
116 nat_traversal_successes: 0,
117 start_time: Instant::now(),
118 }
119 }
120}
121
122impl QuicP2PNode {
123 pub async fn new(config: QuicNodeConfig) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
125 let (secret_key, public_key) = generate_ed25519_keypair();
127 let peer_id = derive_peer_id_from_public_key(&public_key);
128
129 info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
130
131 let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
133
134 let nat_config = NatTraversalConfig {
136 role: config.role,
137 bootstrap_nodes: config.bootstrap_nodes.clone(),
138 max_candidates: 50,
139 coordination_timeout: Duration::from_secs(10),
140 enable_symmetric_nat: true,
141 enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
143 max_concurrent_attempts: 5,
144 bind_addr: config.bind_addr,
145 };
146
147 let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
149 start_time: Instant::now(),
150 ..Default::default()
151 }));
152 let stats_for_callback = Arc::clone(&stats_clone);
153
154 let event_callback = Box::new(move |event: NatTraversalEvent| {
155 let stats = stats_for_callback.clone();
156 tokio::spawn(async move {
157 let mut stats = stats.lock().await;
158 match event {
159 NatTraversalEvent::CoordinationRequested { .. } => {
160 stats.nat_traversal_attempts += 1;
161 }
162 NatTraversalEvent::ConnectionEstablished { .. } => {
163 stats.nat_traversal_successes += 1;
164 }
165 _ => {}
166 }
167 });
168 });
169
170 let nat_endpoint = Arc::new(
172 NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?
173 );
174
175 Ok(Self {
176 nat_endpoint,
177 connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
178 stats: stats_clone,
179 config,
180 auth_manager,
181 peer_id,
182 })
183 }
184
185 pub fn get_config(&self) -> &QuicNodeConfig {
187 &self.config
188 }
189
190 pub async fn connect_to_bootstrap(
192 &self,
193 bootstrap_addr: SocketAddr,
194 ) -> Result<PeerId, NatTraversalError> {
195 info!("Connecting to bootstrap node at {}", bootstrap_addr);
196
197 let endpoint = self.nat_endpoint.get_quinn_endpoint()
199 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not available".to_string()))?;
200
201 match endpoint.connect(bootstrap_addr, "bootstrap-node") {
203 Ok(connecting) => {
204 match connecting.await {
205 Ok(connection) => {
206 let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
210
211 self.connected_peers.write().await.insert(peer_id, bootstrap_addr);
213
214 {
216 let mut stats = self.stats.lock().await;
217 stats.active_connections += 1;
218 stats.successful_connections += 1;
219 }
220
221 if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
223 callback(NatTraversalEvent::ConnectionEstablished {
224 peer_id,
225 remote_address: bootstrap_addr,
226 });
227 }
228
229 info!("Successfully connected to bootstrap node {} with peer ID {:?}", bootstrap_addr, peer_id);
230 Ok(peer_id)
231 }
232 Err(e) => {
233 error!("Failed to establish connection to bootstrap node {}: {}", bootstrap_addr, e);
234 {
235 let mut stats = self.stats.lock().await;
236 stats.failed_connections += 1;
237 }
238 Err(NatTraversalError::NetworkError(format!("Connection failed: {}", e)))
239 }
240 }
241 }
242 Err(e) => {
243 error!("Failed to initiate connection to bootstrap node {}: {}", bootstrap_addr, e);
244 {
245 let mut stats = self.stats.lock().await;
246 stats.failed_connections += 1;
247 }
248 Err(NatTraversalError::NetworkError(format!("Connect error: {}", e)))
249 }
250 }
251 }
252
253 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
255 use std::hash::{Hash, Hasher};
256 use std::collections::hash_map::DefaultHasher;
257
258 let mut hasher = DefaultHasher::new();
259 addr.hash(&mut hasher);
260 let hash = hasher.finish();
261
262 let mut peer_id_bytes = [0u8; 32];
263 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
264 peer_id_bytes[8..16].copy_from_slice(&addr.port().to_le_bytes());
265
266 PeerId(peer_id_bytes)
267 }
268
269 pub async fn connect_to_peer(
271 &self,
272 peer_id: PeerId,
273 coordinator: SocketAddr,
274 ) -> Result<SocketAddr, NatTraversalError> {
275 info!("Initiating connection to peer {:?} via coordinator {}", peer_id, coordinator);
276
277 {
279 let mut stats = self.stats.lock().await;
280 stats.nat_traversal_attempts += 1;
281 }
282
283 self.nat_endpoint.initiate_nat_traversal(peer_id, coordinator)?;
285
286 let start = Instant::now();
288 let timeout = self.config.connection_timeout;
289
290 while start.elapsed() < timeout {
291 let events = self.nat_endpoint.poll(Instant::now())?;
292
293 for event in events {
294 match event {
295 NatTraversalEvent::ConnectionEstablished { peer_id: evt_peer, remote_address } => {
296 if evt_peer == peer_id {
297 {
299 let mut peers = self.connected_peers.write().await;
300 peers.insert(peer_id, remote_address);
301 }
302
303 {
305 let mut stats = self.stats.lock().await;
306 stats.successful_connections += 1;
307 stats.active_connections += 1;
308 stats.nat_traversal_successes += 1;
309 }
310
311 info!("Successfully connected to peer {:?} at {}", peer_id, remote_address);
312
313 if self.config.auth_config.require_authentication {
315 match self.authenticate_as_initiator(&peer_id).await {
316 Ok(_) => {
317 info!("Authentication successful with peer {:?}", peer_id);
318 }
319 Err(e) => {
320 error!("Authentication failed with peer {:?}: {}", peer_id, e);
321 self.connected_peers.write().await.remove(&peer_id);
323 let mut stats = self.stats.lock().await;
325 stats.active_connections = stats.active_connections.saturating_sub(1);
326 stats.failed_connections += 1;
327 return Err(NatTraversalError::ConfigError(format!("Authentication failed: {}", e)));
328 }
329 }
330 }
331
332 return Ok(remote_address);
333 }
334 }
335 NatTraversalEvent::TraversalFailed { peer_id: evt_peer, error, fallback_available: _ } => {
336 if evt_peer == peer_id {
337 {
339 let mut stats = self.stats.lock().await;
340 stats.failed_connections += 1;
341 }
342
343 error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
344 return Err(error);
345 }
346 }
347 _ => {
348 debug!("Received event: {:?}", event);
349 }
350 }
351 }
352
353 tokio::time::sleep(Duration::from_millis(100)).await;
355 }
356
357 {
359 let mut stats = self.stats.lock().await;
360 stats.failed_connections += 1;
361 }
362
363 Err(NatTraversalError::Timeout)
364 }
365
366 pub async fn accept(&self) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
368 info!("Waiting for incoming connection...");
369
370 match self.nat_endpoint.accept_connection().await {
372 Ok((peer_id, connection)) => {
373 let remote_addr = connection.remote_address();
374
375 {
377 let mut peers = self.connected_peers.write().await;
378 peers.insert(peer_id, remote_addr);
379 }
380
381 {
383 let mut stats = self.stats.lock().await;
384 stats.successful_connections += 1;
385 stats.active_connections += 1;
386 }
387
388 info!("Accepted connection from peer {:?} at {}", peer_id, remote_addr);
389
390 if self.config.auth_config.require_authentication {
392 let self_clone = self.clone();
394 let auth_peer_id = peer_id;
395 tokio::spawn(async move {
396 if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
397 error!("Failed to handle authentication for peer {:?}: {}", auth_peer_id, e);
398 self_clone.connected_peers.write().await.remove(&auth_peer_id);
400 let mut stats = self_clone.stats.lock().await;
401 stats.active_connections = stats.active_connections.saturating_sub(1);
402 }
403 });
404 }
405
406 Ok((remote_addr, peer_id))
407 }
408 Err(e) => {
409 {
411 let mut stats = self.stats.lock().await;
412 stats.failed_connections += 1;
413 }
414
415 error!("Failed to accept connection: {}", e);
416 Err(Box::new(e))
417 }
418 }
419 }
420
421 pub async fn send_to_peer(
423 &self,
424 peer_id: &PeerId,
425 data: &[u8],
426 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
427 let peers = self.connected_peers.read().await;
428
429 if let Some(remote_addr) = peers.get(peer_id) {
430 debug!("Sending {} bytes to peer {:?} at {}", data.len(), peer_id, remote_addr);
431
432 match self.nat_endpoint.get_connection(peer_id) {
434 Ok(Some(connection)) => {
435 let mut send_stream = connection.open_uni().await
437 .map_err(|e| format!("Failed to open unidirectional stream: {}", e))?;
438
439 send_stream.write_all(data).await
441 .map_err(|e| format!("Failed to write data: {}", e))?;
442
443 send_stream.finish().map_err(|e| format!("Failed to finish stream: {}", e))?;
445
446 debug!("Successfully sent {} bytes to peer {:?}", data.len(), peer_id);
447 Ok(())
448 }
449 Ok(None) => {
450 error!("No active connection found for peer {:?}", peer_id);
451 Err("No active connection".into())
452 }
453 Err(e) => {
454 error!("Failed to get connection for peer {:?}: {}", peer_id, e);
455 Err(Box::new(e))
456 }
457 }
458 } else {
459 error!("Peer {:?} not connected", peer_id);
460 Err("Peer not connected".into())
461 }
462 }
463
464 pub async fn receive(&self) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
466 debug!("Waiting to receive data from any connected peer...");
467
468 let peers = {
470 let peers_guard = self.connected_peers.read().await;
471 peers_guard.clone()
472 };
473
474 if peers.is_empty() {
475 return Err("No connected peers".into());
476 }
477
478 for (peer_id, _remote_addr) in peers.iter() {
482 match self.nat_endpoint.get_connection(peer_id) {
483 Ok(Some(connection)) => {
484 match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni()).await {
486 Ok(Ok(mut recv_stream)) => {
487 debug!("Receiving data from unidirectional stream from peer {:?}", peer_id);
488
489 match recv_stream.read_to_end(1024 * 1024).await { Ok(buffer) => {
492 if !buffer.is_empty() {
493 debug!("Received {} bytes from peer {:?}", buffer.len(), peer_id);
494 return Ok((*peer_id, buffer));
495 }
496 }
497 Err(e) => {
498 debug!("Failed to read from stream for peer {:?}: {}", peer_id, e);
499 }
500 }
501 }
502 Ok(Err(e)) => {
503 debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
504 }
505 Err(_) => {
506 }
508 }
509
510 match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi()).await {
512 Ok(Ok((_send_stream, mut recv_stream))) => {
513 debug!("Receiving data from bidirectional stream from peer {:?}", peer_id);
514
515 match recv_stream.read_to_end(1024 * 1024).await { Ok(buffer) => {
518 if !buffer.is_empty() {
519 debug!("Received {} bytes from peer {:?} via bidirectional stream", buffer.len(), peer_id);
520 return Ok((*peer_id, buffer));
521 }
522 }
523 Err(e) => {
524 debug!("Failed to read from bidirectional stream for peer {:?}: {}", peer_id, e);
525 }
526 }
527 }
528 Ok(Err(e)) => {
529 debug!("Failed to accept bidirectional stream from peer {:?}: {}", peer_id, e);
530 }
531 Err(_) => {
532 }
534 }
535 }
536 Ok(None) => {
537 debug!("No active connection for peer {:?}", peer_id);
538 }
539 Err(e) => {
540 debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
541 }
542 }
543 }
544
545 Err("No data available from any connected peer".into())
547 }
548
549 pub async fn get_stats(&self) -> NodeStats {
551 self.stats.lock().await.clone()
552 }
553
554 pub fn get_nat_endpoint(&self) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
556 Ok(&*self.nat_endpoint)
557 }
558
559 pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
561 let stats = Arc::clone(&self.stats);
562 let interval_duration = self.config.stats_interval;
563
564 tokio::spawn(async move {
565 let mut interval = tokio::time::interval(interval_duration);
566
567 loop {
568 interval.tick().await;
569
570 let stats_snapshot = stats.lock().await.clone();
571
572 info!(
573 "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
574 stats_snapshot.active_connections,
575 stats_snapshot.successful_connections,
576 stats_snapshot.nat_traversal_successes,
577 stats_snapshot.nat_traversal_attempts
578 );
579 }
580 })
581 }
582
583
584 pub async fn get_nat_stats(&self) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
586 self.nat_endpoint.get_nat_stats()
587 }
588
589 pub async fn get_connection_metrics(&self, peer_id: &PeerId) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
591 match self.nat_endpoint.get_connection(peer_id) {
592 Ok(Some(connection)) => {
593 let rtt = connection.rtt();
595
596 let stats = connection.stats();
598
599 Ok(ConnectionMetrics {
600 bytes_sent: stats.udp_tx.bytes,
601 bytes_received: stats.udp_rx.bytes,
602 rtt: Some(rtt),
603 packet_loss: stats.path.lost_packets as f64 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
604 })
605 }
606 Ok(None) => Err("Connection not found".into()),
607 Err(e) => Err(format!("Failed to get connection: {}", e).into()),
608 }
609 }
610
611 pub fn peer_id(&self) -> PeerId {
613 self.peer_id
614 }
615
616 pub fn public_key_bytes(&self) -> [u8; 32] {
618 self.auth_manager.public_key_bytes()
619 }
620
621 async fn send_auth_message(
623 &self,
624 peer_id: &PeerId,
625 message: AuthMessage,
626 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
627 let data = AuthManager::serialize_message(&message)?;
628 self.send_to_peer(peer_id, &data).await
629 }
630
631 async fn authenticate_as_initiator(
633 &self,
634 peer_id: &PeerId,
635 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
636 info!("Starting authentication with peer {:?}", peer_id);
637
638 let auth_request = self.auth_manager.create_auth_request();
640 self.send_auth_message(peer_id, auth_request).await?;
641
642 let timeout_duration = self.config.auth_config.auth_timeout;
644 let start = Instant::now();
645
646 while start.elapsed() < timeout_duration {
647 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
648 Ok(Ok((recv_peer_id, data))) => {
649 if recv_peer_id == *peer_id {
650 match AuthManager::deserialize_message(&data) {
651 Ok(AuthMessage::Challenge { nonce, .. }) => {
652 let response = self.auth_manager.create_challenge_response(nonce)?;
654 self.send_auth_message(peer_id, response).await?;
655 }
656 Ok(AuthMessage::AuthSuccess { .. }) => {
657 info!("Authentication successful with peer {:?}", peer_id);
658 return Ok(());
659 }
660 Ok(AuthMessage::AuthFailure { reason }) => {
661 return Err(format!("Authentication failed: {}", reason).into());
662 }
663 _ => continue,
664 }
665 }
666 }
667 _ => continue,
668 }
669 }
670
671 Err("Authentication timeout".into())
672 }
673
674 async fn handle_auth_message(
676 &self,
677 peer_id: PeerId,
678 message: AuthMessage,
679 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
680 let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
681
682 match auth_protocol.handle_message(peer_id, message).await {
683 Ok(Some(response)) => {
684 self.send_auth_message(&peer_id, response).await?;
685 }
686 Ok(None) => {
687 }
689 Err(e) => {
690 error!("Authentication error: {}", e);
691 let failure = AuthMessage::AuthFailure {
692 reason: e.to_string(),
693 };
694 self.send_auth_message(&peer_id, failure).await?;
695 return Err(Box::new(e));
696 }
697 }
698
699 Ok(())
700 }
701
702 pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
704 self.auth_manager.is_authenticated(peer_id).await
705 }
706
707 pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
709 self.auth_manager.list_authenticated_peers().await
710 }
711
712 async fn handle_incoming_auth(
714 &self,
715 peer_id: PeerId,
716 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
717 info!("Handling incoming authentication from peer {:?}", peer_id);
718
719 let timeout_duration = self.config.auth_config.auth_timeout;
720 let start = Instant::now();
721
722 while start.elapsed() < timeout_duration {
723 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
724 Ok(Ok((recv_peer_id, data))) => {
725 if recv_peer_id == peer_id {
726 match AuthManager::deserialize_message(&data) {
727 Ok(auth_msg) => {
728 self.handle_auth_message(peer_id, auth_msg).await?;
729
730 if self.auth_manager.is_authenticated(&peer_id).await {
732 info!("Peer {:?} successfully authenticated", peer_id);
733 return Ok(());
734 }
735 }
736 Err(_) => {
737 continue;
739 }
740 }
741 }
742 }
743 _ => continue,
744 }
745 }
746
747 Err("Authentication timeout waiting for peer".into())
748 }
749}
750