1use std::{
7 collections::HashMap,
8 net::SocketAddr,
9 sync::Arc,
10 time::{Duration, Instant},
11};
12
13use tracing::{debug, error, info};
14
15use crate::{
16 auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
17 crypto::raw_public_keys::key_utils::{
18 derive_peer_id_from_public_key, generate_ed25519_keypair,
19 },
20 nat_traversal_api::{
21 EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
22 NatTraversalEvent, NatTraversalStatistics, PeerId,
23 },
24};
25
26#[derive(Clone)]
28pub struct QuicP2PNode {
29 nat_endpoint: Arc<NatTraversalEndpoint>,
31 connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
33 stats: Arc<tokio::sync::Mutex<NodeStats>>,
35 config: QuicNodeConfig,
37 auth_manager: Arc<AuthManager>,
39 peer_id: PeerId,
41}
42
43#[derive(Debug, Clone)]
45pub struct QuicNodeConfig {
46 pub role: EndpointRole,
48 pub bootstrap_nodes: Vec<SocketAddr>,
50 pub enable_coordinator: bool,
52 pub max_connections: usize,
54 pub connection_timeout: Duration,
56 pub stats_interval: Duration,
58 pub auth_config: AuthConfig,
60 pub bind_addr: Option<SocketAddr>,
62}
63
64impl Default for QuicNodeConfig {
65 fn default() -> Self {
66 Self {
67 role: EndpointRole::Client,
68 bootstrap_nodes: Vec::new(),
69 enable_coordinator: false,
70 max_connections: 100,
71 connection_timeout: Duration::from_secs(30),
72 stats_interval: Duration::from_secs(30),
73 auth_config: AuthConfig::default(),
74 bind_addr: None,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct ConnectionMetrics {
82 pub bytes_sent: u64,
84 pub bytes_received: u64,
86 pub rtt: Option<Duration>,
88 pub packet_loss: f64,
90}
91
92#[derive(Debug, Clone)]
94pub struct NodeStats {
95 pub active_connections: usize,
97 pub successful_connections: u64,
99 pub failed_connections: u64,
101 pub nat_traversal_attempts: u64,
103 pub nat_traversal_successes: u64,
105 pub start_time: Instant,
107}
108
109impl Default for NodeStats {
110 fn default() -> Self {
111 Self {
112 active_connections: 0,
113 successful_connections: 0,
114 failed_connections: 0,
115 nat_traversal_attempts: 0,
116 nat_traversal_successes: 0,
117 start_time: Instant::now(),
118 }
119 }
120}
121
122impl QuicP2PNode {
123 pub async fn new(
125 config: QuicNodeConfig,
126 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
127 let (secret_key, public_key) = generate_ed25519_keypair();
129 let peer_id = derive_peer_id_from_public_key(&public_key);
130
131 info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
132
133 let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
135
136 let nat_config = NatTraversalConfig {
138 role: config.role,
139 bootstrap_nodes: config.bootstrap_nodes.clone(),
140 max_candidates: 50,
141 coordination_timeout: Duration::from_secs(10),
142 enable_symmetric_nat: true,
143 enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
145 max_concurrent_attempts: 5,
146 bind_addr: config.bind_addr,
147 };
148
149 let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
151 start_time: Instant::now(),
152 ..Default::default()
153 }));
154 let stats_for_callback = Arc::clone(&stats_clone);
155
156 let event_callback = Box::new(move |event: NatTraversalEvent| {
157 let stats = stats_for_callback.clone();
158 tokio::spawn(async move {
159 let mut stats = stats.lock().await;
160 match event {
161 NatTraversalEvent::CoordinationRequested { .. } => {
162 stats.nat_traversal_attempts += 1;
163 }
164 NatTraversalEvent::ConnectionEstablished { .. } => {
165 stats.nat_traversal_successes += 1;
166 }
167 _ => {}
168 }
169 });
170 });
171
172 let nat_endpoint =
174 Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
175
176 Ok(Self {
177 nat_endpoint,
178 connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
179 stats: stats_clone,
180 config,
181 auth_manager,
182 peer_id,
183 })
184 }
185
186 pub fn get_config(&self) -> &QuicNodeConfig {
188 &self.config
189 }
190
191 pub async fn connect_to_bootstrap(
193 &self,
194 bootstrap_addr: SocketAddr,
195 ) -> Result<PeerId, NatTraversalError> {
196 info!("Connecting to bootstrap node at {}", bootstrap_addr);
197
198 let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
200 NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
201 })?;
202
203 match endpoint.connect(bootstrap_addr, "bootstrap-node") {
205 Ok(connecting) => {
206 match connecting.await {
207 Ok(_connection) => {
208 let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
212
213 self.connected_peers
215 .write()
216 .await
217 .insert(peer_id, bootstrap_addr);
218
219 {
221 let mut stats = self.stats.lock().await;
222 stats.active_connections += 1;
223 stats.successful_connections += 1;
224 }
225
226 if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
228 callback(NatTraversalEvent::ConnectionEstablished {
229 peer_id,
230 remote_address: bootstrap_addr,
231 });
232 }
233
234 info!(
235 "Successfully connected to bootstrap node {} with peer ID {:?}",
236 bootstrap_addr, peer_id
237 );
238 Ok(peer_id)
239 }
240 Err(e) => {
241 error!(
242 "Failed to establish connection to bootstrap node {}: {}",
243 bootstrap_addr, e
244 );
245 {
246 let mut stats = self.stats.lock().await;
247 stats.failed_connections += 1;
248 }
249 Err(NatTraversalError::NetworkError(format!(
250 "Connection failed: {}",
251 e
252 )))
253 }
254 }
255 }
256 Err(e) => {
257 error!(
258 "Failed to initiate connection to bootstrap node {}: {}",
259 bootstrap_addr, e
260 );
261 {
262 let mut stats = self.stats.lock().await;
263 stats.failed_connections += 1;
264 }
265 Err(NatTraversalError::NetworkError(format!(
266 "Connect error: {}",
267 e
268 )))
269 }
270 }
271 }
272
273 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
275 use std::collections::hash_map::DefaultHasher;
276 use std::hash::{Hash, Hasher};
277
278 let mut hasher = DefaultHasher::new();
279 addr.hash(&mut hasher);
280 let hash = hasher.finish();
281
282 let mut peer_id_bytes = [0u8; 32];
283 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
284 let port_bytes = addr.port().to_le_bytes();
285 peer_id_bytes[8..10].copy_from_slice(&port_bytes);
286
287 PeerId(peer_id_bytes)
288 }
289
290 pub async fn connect_to_peer(
292 &self,
293 peer_id: PeerId,
294 coordinator: SocketAddr,
295 ) -> Result<SocketAddr, NatTraversalError> {
296 info!(
297 "Initiating connection to peer {:?} via coordinator {}",
298 peer_id, coordinator
299 );
300
301 {
303 let mut stats = self.stats.lock().await;
304 stats.nat_traversal_attempts += 1;
305 }
306
307 self.nat_endpoint
309 .initiate_nat_traversal(peer_id, coordinator)?;
310
311 let start = Instant::now();
313 let timeout = self.config.connection_timeout;
314
315 while start.elapsed() < timeout {
316 let events = self.nat_endpoint.poll(Instant::now())?;
317
318 for event in events {
319 match event {
320 NatTraversalEvent::ConnectionEstablished {
321 peer_id: evt_peer,
322 remote_address,
323 } => {
324 if evt_peer == peer_id {
325 {
327 let mut peers = self.connected_peers.write().await;
328 peers.insert(peer_id, remote_address);
329 }
330
331 {
333 let mut stats = self.stats.lock().await;
334 stats.successful_connections += 1;
335 stats.active_connections += 1;
336 stats.nat_traversal_successes += 1;
337 }
338
339 info!(
340 "Successfully connected to peer {:?} at {}",
341 peer_id, remote_address
342 );
343
344 if self.config.auth_config.require_authentication {
346 match self.authenticate_as_initiator(&peer_id).await {
347 Ok(_) => {
348 info!("Authentication successful with peer {:?}", peer_id);
349 }
350 Err(e) => {
351 error!(
352 "Authentication failed with peer {:?}: {}",
353 peer_id, e
354 );
355 self.connected_peers.write().await.remove(&peer_id);
357 let mut stats = self.stats.lock().await;
359 stats.active_connections =
360 stats.active_connections.saturating_sub(1);
361 stats.failed_connections += 1;
362 return Err(NatTraversalError::ConfigError(format!(
363 "Authentication failed: {}",
364 e
365 )));
366 }
367 }
368 }
369
370 return Ok(remote_address);
371 }
372 }
373 NatTraversalEvent::TraversalFailed {
374 peer_id: evt_peer,
375 error,
376 fallback_available: _,
377 } => {
378 if evt_peer == peer_id {
379 {
381 let mut stats = self.stats.lock().await;
382 stats.failed_connections += 1;
383 }
384
385 error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
386 return Err(error);
387 }
388 }
389 _ => {
390 debug!("Received event: {:?}", event);
391 }
392 }
393 }
394
395 tokio::time::sleep(Duration::from_millis(100)).await;
397 }
398
399 {
401 let mut stats = self.stats.lock().await;
402 stats.failed_connections += 1;
403 }
404
405 Err(NatTraversalError::Timeout)
406 }
407
408 pub async fn accept(
410 &self,
411 ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
412 info!("Waiting for incoming connection...");
413
414 match self.nat_endpoint.accept_connection().await {
416 Ok((peer_id, connection)) => {
417 let remote_addr = connection.remote_address();
418
419 {
421 let mut peers = self.connected_peers.write().await;
422 peers.insert(peer_id, remote_addr);
423 }
424
425 {
427 let mut stats = self.stats.lock().await;
428 stats.successful_connections += 1;
429 stats.active_connections += 1;
430 }
431
432 info!(
433 "Accepted connection from peer {:?} at {}",
434 peer_id, remote_addr
435 );
436
437 if self.config.auth_config.require_authentication {
439 let self_clone = self.clone();
441 let auth_peer_id = peer_id;
442 tokio::spawn(async move {
443 if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
444 error!(
445 "Failed to handle authentication for peer {:?}: {}",
446 auth_peer_id, e
447 );
448 self_clone
450 .connected_peers
451 .write()
452 .await
453 .remove(&auth_peer_id);
454 let mut stats = self_clone.stats.lock().await;
455 stats.active_connections = stats.active_connections.saturating_sub(1);
456 }
457 });
458 }
459
460 Ok((remote_addr, peer_id))
461 }
462 Err(e) => {
463 {
465 let mut stats = self.stats.lock().await;
466 stats.failed_connections += 1;
467 }
468
469 error!("Failed to accept connection: {}", e);
470 Err(Box::new(e))
471 }
472 }
473 }
474
475 pub async fn send_to_peer(
477 &self,
478 peer_id: &PeerId,
479 data: &[u8],
480 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
481 let peers = self.connected_peers.read().await;
482
483 if let Some(remote_addr) = peers.get(peer_id) {
484 debug!(
485 "Sending {} bytes to peer {:?} at {}",
486 data.len(),
487 peer_id,
488 remote_addr
489 );
490
491 match self.nat_endpoint.get_connection(peer_id) {
493 Ok(Some(connection)) => {
494 let mut send_stream = connection
496 .open_uni()
497 .await
498 .map_err(|e| format!("Failed to open unidirectional stream: {}", e))?;
499
500 send_stream
502 .write_all(data)
503 .await
504 .map_err(|e| format!("Failed to write data: {}", e))?;
505
506 send_stream
508 .finish()
509 .map_err(|e| format!("Failed to finish stream: {}", e))?;
510
511 debug!(
512 "Successfully sent {} bytes to peer {:?}",
513 data.len(),
514 peer_id
515 );
516 Ok(())
517 }
518 Ok(None) => {
519 error!("No active connection found for peer {:?}", peer_id);
520 Err("No active connection".into())
521 }
522 Err(e) => {
523 error!("Failed to get connection for peer {:?}: {}", peer_id, e);
524 Err(Box::new(e))
525 }
526 }
527 } else {
528 error!("Peer {:?} not connected", peer_id);
529 Err("Peer not connected".into())
530 }
531 }
532
533 pub async fn receive(
535 &self,
536 ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
537 debug!("Waiting to receive data from any connected peer...");
538
539 let peers = {
541 let peers_guard = self.connected_peers.read().await;
542 peers_guard.clone()
543 };
544
545 if peers.is_empty() {
546 return Err("No connected peers".into());
547 }
548
549 for (peer_id, _remote_addr) in peers.iter() {
553 match self.nat_endpoint.get_connection(peer_id) {
554 Ok(Some(connection)) => {
555 match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
557 .await
558 {
559 Ok(Ok(mut recv_stream)) => {
560 debug!(
561 "Receiving data from unidirectional stream from peer {:?}",
562 peer_id
563 );
564
565 match recv_stream.read_to_end(1024 * 1024).await {
567 Ok(buffer) => {
569 if !buffer.is_empty() {
570 debug!(
571 "Received {} bytes from peer {:?}",
572 buffer.len(),
573 peer_id
574 );
575 return Ok((*peer_id, buffer));
576 }
577 }
578 Err(e) => {
579 debug!(
580 "Failed to read from stream for peer {:?}: {}",
581 peer_id, e
582 );
583 }
584 }
585 }
586 Ok(Err(e)) => {
587 debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
588 }
589 Err(_) => {
590 }
592 }
593
594 match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
596 .await
597 {
598 Ok(Ok((_send_stream, mut recv_stream))) => {
599 debug!(
600 "Receiving data from bidirectional stream from peer {:?}",
601 peer_id
602 );
603
604 match recv_stream.read_to_end(1024 * 1024).await {
606 Ok(buffer) => {
608 if !buffer.is_empty() {
609 debug!(
610 "Received {} bytes from peer {:?} via bidirectional stream",
611 buffer.len(),
612 peer_id
613 );
614 return Ok((*peer_id, buffer));
615 }
616 }
617 Err(e) => {
618 debug!(
619 "Failed to read from bidirectional stream for peer {:?}: {}",
620 peer_id, e
621 );
622 }
623 }
624 }
625 Ok(Err(e)) => {
626 debug!(
627 "Failed to accept bidirectional stream from peer {:?}: {}",
628 peer_id, e
629 );
630 }
631 Err(_) => {
632 }
634 }
635 }
636 Ok(None) => {
637 debug!("No active connection for peer {:?}", peer_id);
638 }
639 Err(e) => {
640 debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
641 }
642 }
643 }
644
645 Err("No data available from any connected peer".into())
647 }
648
649 pub async fn get_stats(&self) -> NodeStats {
651 self.stats.lock().await.clone()
652 }
653
654 pub fn get_nat_endpoint(
656 &self,
657 ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
658 Ok(&*self.nat_endpoint)
659 }
660
661 pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
663 let stats = Arc::clone(&self.stats);
664 let interval_duration = self.config.stats_interval;
665
666 tokio::spawn(async move {
667 let mut interval = tokio::time::interval(interval_duration);
668
669 loop {
670 interval.tick().await;
671
672 let stats_snapshot = stats.lock().await.clone();
673
674 info!(
675 "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
676 stats_snapshot.active_connections,
677 stats_snapshot.successful_connections,
678 stats_snapshot.nat_traversal_successes,
679 stats_snapshot.nat_traversal_attempts
680 );
681 }
682 })
683 }
684
685 pub async fn get_nat_stats(
687 &self,
688 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
689 self.nat_endpoint.get_nat_stats()
690 }
691
692 pub async fn get_connection_metrics(
694 &self,
695 peer_id: &PeerId,
696 ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
697 match self.nat_endpoint.get_connection(peer_id) {
698 Ok(Some(connection)) => {
699 let rtt = connection.rtt();
701
702 let stats = connection.stats();
704
705 Ok(ConnectionMetrics {
706 bytes_sent: stats.udp_tx.bytes,
707 bytes_received: stats.udp_rx.bytes,
708 rtt: Some(rtt),
709 packet_loss: stats.path.lost_packets as f64
710 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
711 })
712 }
713 Ok(None) => Err("Connection not found".into()),
714 Err(e) => Err(format!("Failed to get connection: {}", e).into()),
715 }
716 }
717
718 pub fn peer_id(&self) -> PeerId {
720 self.peer_id
721 }
722
723 pub fn public_key_bytes(&self) -> [u8; 32] {
725 self.auth_manager.public_key_bytes()
726 }
727
728 async fn send_auth_message(
730 &self,
731 peer_id: &PeerId,
732 message: AuthMessage,
733 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
734 let data = AuthManager::serialize_message(&message)?;
735 self.send_to_peer(peer_id, &data).await
736 }
737
738 async fn authenticate_as_initiator(
740 &self,
741 peer_id: &PeerId,
742 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
743 info!("Starting authentication with peer {:?}", peer_id);
744
745 let auth_request = self.auth_manager.create_auth_request();
747 self.send_auth_message(peer_id, auth_request).await?;
748
749 let timeout_duration = self.config.auth_config.auth_timeout;
751 let start = Instant::now();
752
753 while start.elapsed() < timeout_duration {
754 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
755 Ok(Ok((recv_peer_id, data))) => {
756 if recv_peer_id == *peer_id {
757 match AuthManager::deserialize_message(&data) {
758 Ok(AuthMessage::Challenge { nonce, .. }) => {
759 let response =
761 self.auth_manager.create_challenge_response(nonce)?;
762 self.send_auth_message(peer_id, response).await?;
763 }
764 Ok(AuthMessage::AuthSuccess { .. }) => {
765 info!("Authentication successful with peer {:?}", peer_id);
766 return Ok(());
767 }
768 Ok(AuthMessage::AuthFailure { reason }) => {
769 return Err(format!("Authentication failed: {}", reason).into());
770 }
771 _ => continue,
772 }
773 }
774 }
775 _ => continue,
776 }
777 }
778
779 Err("Authentication timeout".into())
780 }
781
782 async fn handle_auth_message(
784 &self,
785 peer_id: PeerId,
786 message: AuthMessage,
787 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
788 let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
789
790 match auth_protocol.handle_message(peer_id, message).await {
791 Ok(Some(response)) => {
792 self.send_auth_message(&peer_id, response).await?;
793 }
794 Ok(None) => {
795 }
797 Err(e) => {
798 error!("Authentication error: {}", e);
799 let failure = AuthMessage::AuthFailure {
800 reason: e.to_string(),
801 };
802 self.send_auth_message(&peer_id, failure).await?;
803 return Err(Box::new(e));
804 }
805 }
806
807 Ok(())
808 }
809
810 pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
812 self.auth_manager.is_authenticated(peer_id).await
813 }
814
815 pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
817 self.auth_manager.list_authenticated_peers().await
818 }
819
820 async fn handle_incoming_auth(
822 &self,
823 peer_id: PeerId,
824 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
825 info!("Handling incoming authentication from peer {:?}", peer_id);
826
827 let timeout_duration = self.config.auth_config.auth_timeout;
828 let start = Instant::now();
829
830 while start.elapsed() < timeout_duration {
831 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
832 Ok(Ok((recv_peer_id, data))) => {
833 if recv_peer_id == peer_id {
834 match AuthManager::deserialize_message(&data) {
835 Ok(auth_msg) => {
836 self.handle_auth_message(peer_id, auth_msg).await?;
837
838 if self.auth_manager.is_authenticated(&peer_id).await {
840 info!("Peer {:?} successfully authenticated", peer_id);
841 return Ok(());
842 }
843 }
844 Err(_) => {
845 continue;
847 }
848 }
849 }
850 }
851 _ => continue,
852 }
853 }
854
855 Err("Authentication timeout waiting for peer".into())
856 }
857}