1use std::{
7 collections::HashMap,
8 net::SocketAddr,
9 sync::Arc,
10 time::{Duration, Instant},
11};
12
13use tracing::{debug, error, info};
14
15use crate::{
16 auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
17 crypto::raw_public_keys::key_utils::{
18 derive_peer_id_from_public_key, generate_ed25519_keypair,
19 },
20 nat_traversal_api::{
21 EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
22 NatTraversalEvent, NatTraversalStatistics, PeerId,
23 },
24};
25
26#[derive(Clone)]
28pub struct QuicP2PNode {
29 nat_endpoint: Arc<NatTraversalEndpoint>,
31 connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
33 stats: Arc<tokio::sync::Mutex<NodeStats>>,
35 config: QuicNodeConfig,
37 auth_manager: Arc<AuthManager>,
39 peer_id: PeerId,
41}
42
43#[derive(Debug, Clone)]
45pub struct QuicNodeConfig {
46 pub role: EndpointRole,
48 pub bootstrap_nodes: Vec<SocketAddr>,
50 pub enable_coordinator: bool,
52 pub max_connections: usize,
54 pub connection_timeout: Duration,
56 pub stats_interval: Duration,
58 pub auth_config: AuthConfig,
60 pub bind_addr: Option<SocketAddr>,
62}
63
64impl Default for QuicNodeConfig {
65 fn default() -> Self {
66 Self {
67 role: EndpointRole::Client,
68 bootstrap_nodes: Vec::new(),
69 enable_coordinator: false,
70 max_connections: 100,
71 connection_timeout: Duration::from_secs(30),
72 stats_interval: Duration::from_secs(30),
73 auth_config: AuthConfig::default(),
74 bind_addr: None,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct ConnectionMetrics {
82 pub bytes_sent: u64,
84 pub bytes_received: u64,
86 pub rtt: Option<Duration>,
88 pub packet_loss: f64,
90}
91
92#[derive(Debug, Clone)]
94pub struct NodeStats {
95 pub active_connections: usize,
97 pub successful_connections: u64,
99 pub failed_connections: u64,
101 pub nat_traversal_attempts: u64,
103 pub nat_traversal_successes: u64,
105 pub start_time: Instant,
107}
108
109impl Default for NodeStats {
110 fn default() -> Self {
111 Self {
112 active_connections: 0,
113 successful_connections: 0,
114 failed_connections: 0,
115 nat_traversal_attempts: 0,
116 nat_traversal_successes: 0,
117 start_time: Instant::now(),
118 }
119 }
120}
121
122impl QuicP2PNode {
123 pub async fn new(
125 config: QuicNodeConfig,
126 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
127 let (secret_key, public_key) = generate_ed25519_keypair();
129 let peer_id = derive_peer_id_from_public_key(&public_key);
130
131 info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
132
133 let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
135
136 let nat_config = NatTraversalConfig {
138 role: config.role,
139 bootstrap_nodes: config.bootstrap_nodes.clone(),
140 max_candidates: 50,
141 coordination_timeout: Duration::from_secs(10),
142 enable_symmetric_nat: true,
143 enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
145 max_concurrent_attempts: 5,
146 bind_addr: config.bind_addr,
147 prefer_rfc_nat_traversal: true, timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
149 };
150
151 let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
153 start_time: Instant::now(),
154 ..Default::default()
155 }));
156 let stats_for_callback = Arc::clone(&stats_clone);
157
158 let event_callback = Box::new(move |event: NatTraversalEvent| {
159 let stats = stats_for_callback.clone();
160 tokio::spawn(async move {
161 let mut stats = stats.lock().await;
162 match event {
163 NatTraversalEvent::CoordinationRequested { .. } => {
164 stats.nat_traversal_attempts += 1;
165 }
166 NatTraversalEvent::ConnectionEstablished { .. } => {
167 stats.nat_traversal_successes += 1;
168 }
169 _ => {}
170 }
171 });
172 });
173
174 let nat_endpoint =
176 Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
177
178 Ok(Self {
179 nat_endpoint,
180 connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
181 stats: stats_clone,
182 config,
183 auth_manager,
184 peer_id,
185 })
186 }
187
188 pub fn get_config(&self) -> &QuicNodeConfig {
190 &self.config
191 }
192
193 pub async fn connect_to_bootstrap(
195 &self,
196 bootstrap_addr: SocketAddr,
197 ) -> Result<PeerId, NatTraversalError> {
198 info!("Connecting to bootstrap node at {}", bootstrap_addr);
199
200 let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
202 NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
203 })?;
204
205 match endpoint.connect(bootstrap_addr, "bootstrap-node") {
207 Ok(connecting) => {
208 match connecting.await {
209 Ok(_connection) => {
210 let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
214
215 self.connected_peers
217 .write()
218 .await
219 .insert(peer_id, bootstrap_addr);
220
221 {
223 let mut stats = self.stats.lock().await;
224 stats.active_connections += 1;
225 stats.successful_connections += 1;
226 }
227
228 if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
230 callback(NatTraversalEvent::ConnectionEstablished {
231 peer_id,
232 remote_address: bootstrap_addr,
233 });
234 }
235
236 info!(
237 "Successfully connected to bootstrap node {} with peer ID {:?}",
238 bootstrap_addr, peer_id
239 );
240 Ok(peer_id)
241 }
242 Err(e) => {
243 error!(
244 "Failed to establish connection to bootstrap node {}: {}",
245 bootstrap_addr, e
246 );
247 {
248 let mut stats = self.stats.lock().await;
249 stats.failed_connections += 1;
250 }
251 Err(NatTraversalError::NetworkError(format!(
252 "Connection failed: {e}"
253 )))
254 }
255 }
256 }
257 Err(e) => {
258 error!(
259 "Failed to initiate connection to bootstrap node {}: {}",
260 bootstrap_addr, e
261 );
262 {
263 let mut stats = self.stats.lock().await;
264 stats.failed_connections += 1;
265 }
266 Err(NatTraversalError::NetworkError(format!(
267 "Connect error: {e}"
268 )))
269 }
270 }
271 }
272
273 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
275 use std::collections::hash_map::DefaultHasher;
276 use std::hash::{Hash, Hasher};
277
278 let mut hasher = DefaultHasher::new();
279 addr.hash(&mut hasher);
280 let hash = hasher.finish();
281
282 let mut peer_id_bytes = [0u8; 32];
283 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
284 let port_bytes = addr.port().to_le_bytes();
285 peer_id_bytes[8..10].copy_from_slice(&port_bytes);
286
287 PeerId(peer_id_bytes)
288 }
289
290 pub async fn connect_to_peer(
292 &self,
293 peer_id: PeerId,
294 coordinator: SocketAddr,
295 ) -> Result<SocketAddr, NatTraversalError> {
296 info!(
297 "Initiating connection to peer {:?} via coordinator {}",
298 peer_id, coordinator
299 );
300
301 {
303 let mut stats = self.stats.lock().await;
304 stats.nat_traversal_attempts += 1;
305 }
306
307 self.nat_endpoint
309 .initiate_nat_traversal(peer_id, coordinator)?;
310
311 let start = Instant::now();
313 let timeout = self.config.connection_timeout;
314
315 while start.elapsed() < timeout {
316 let events = self.nat_endpoint.poll(Instant::now())?;
317
318 for event in events {
319 match event {
320 NatTraversalEvent::ConnectionEstablished {
321 peer_id: evt_peer,
322 remote_address,
323 } => {
324 if evt_peer == peer_id {
325 {
327 let mut peers = self.connected_peers.write().await;
328 peers.insert(peer_id, remote_address);
329 }
330
331 {
333 let mut stats = self.stats.lock().await;
334 stats.successful_connections += 1;
335 stats.active_connections += 1;
336 stats.nat_traversal_successes += 1;
337 }
338
339 info!(
340 "Successfully connected to peer {:?} at {}",
341 peer_id, remote_address
342 );
343
344 if self.config.auth_config.require_authentication {
346 match self.authenticate_as_initiator(&peer_id).await {
347 Ok(_) => {
348 info!("Authentication successful with peer {:?}", peer_id);
349 }
350 Err(e) => {
351 error!(
352 "Authentication failed with peer {:?}: {}",
353 peer_id, e
354 );
355 self.connected_peers.write().await.remove(&peer_id);
357 let mut stats = self.stats.lock().await;
359 stats.active_connections =
360 stats.active_connections.saturating_sub(1);
361 stats.failed_connections += 1;
362 return Err(NatTraversalError::ConfigError(format!(
363 "Authentication failed: {e}"
364 )));
365 }
366 }
367 }
368
369 return Ok(remote_address);
370 }
371 }
372 NatTraversalEvent::TraversalFailed {
373 peer_id: evt_peer,
374 error,
375 fallback_available: _,
376 } => {
377 if evt_peer == peer_id {
378 {
380 let mut stats = self.stats.lock().await;
381 stats.failed_connections += 1;
382 }
383
384 error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
385 return Err(error);
386 }
387 }
388 _ => {
389 debug!("Received event: {:?}", event);
390 }
391 }
392 }
393
394 tokio::time::sleep(Duration::from_millis(100)).await;
396 }
397
398 {
400 let mut stats = self.stats.lock().await;
401 stats.failed_connections += 1;
402 }
403
404 Err(NatTraversalError::Timeout)
405 }
406
407 pub async fn accept(
409 &self,
410 ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
411 info!("Waiting for incoming connection...");
412
413 match self.nat_endpoint.accept_connection().await {
415 Ok((peer_id, connection)) => {
416 let remote_addr = connection.remote_address();
417
418 {
420 let mut peers = self.connected_peers.write().await;
421 peers.insert(peer_id, remote_addr);
422 }
423
424 {
426 let mut stats = self.stats.lock().await;
427 stats.successful_connections += 1;
428 stats.active_connections += 1;
429 }
430
431 info!(
432 "Accepted connection from peer {:?} at {}",
433 peer_id, remote_addr
434 );
435
436 if self.config.auth_config.require_authentication {
438 let self_clone = self.clone();
440 let auth_peer_id = peer_id;
441 tokio::spawn(async move {
442 if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
443 error!(
444 "Failed to handle authentication for peer {:?}: {}",
445 auth_peer_id, e
446 );
447 self_clone
449 .connected_peers
450 .write()
451 .await
452 .remove(&auth_peer_id);
453 let mut stats = self_clone.stats.lock().await;
454 stats.active_connections = stats.active_connections.saturating_sub(1);
455 }
456 });
457 }
458
459 Ok((remote_addr, peer_id))
460 }
461 Err(e) => {
462 {
464 let mut stats = self.stats.lock().await;
465 stats.failed_connections += 1;
466 }
467
468 error!("Failed to accept connection: {}", e);
469 Err(Box::new(e))
470 }
471 }
472 }
473
474 pub async fn send_to_peer(
476 &self,
477 peer_id: &PeerId,
478 data: &[u8],
479 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
480 let peers = self.connected_peers.read().await;
481
482 if let Some(remote_addr) = peers.get(peer_id) {
483 debug!(
484 "Sending {} bytes to peer {:?} at {}",
485 data.len(),
486 peer_id,
487 remote_addr
488 );
489
490 match self.nat_endpoint.get_connection(peer_id) {
492 Ok(Some(connection)) => {
493 let mut send_stream = connection
495 .open_uni()
496 .await
497 .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
498
499 send_stream
501 .write_all(data)
502 .await
503 .map_err(|e| format!("Failed to write data: {e}"))?;
504
505 send_stream
507 .finish()
508 .map_err(|e| format!("Failed to finish stream: {e}"))?;
509
510 debug!(
511 "Successfully sent {} bytes to peer {:?}",
512 data.len(),
513 peer_id
514 );
515 Ok(())
516 }
517 Ok(None) => {
518 error!("No active connection found for peer {:?}", peer_id);
519 Err("No active connection".into())
520 }
521 Err(e) => {
522 error!("Failed to get connection for peer {:?}: {}", peer_id, e);
523 Err(Box::new(e))
524 }
525 }
526 } else {
527 error!("Peer {:?} not connected", peer_id);
528 Err("Peer not connected".into())
529 }
530 }
531
532 pub async fn receive(
534 &self,
535 ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
536 debug!("Waiting to receive data from any connected peer...");
537
538 let peers = {
540 let peers_guard = self.connected_peers.read().await;
541 peers_guard.clone()
542 };
543
544 if peers.is_empty() {
545 return Err("No connected peers".into());
546 }
547
548 for (peer_id, _remote_addr) in peers.iter() {
552 match self.nat_endpoint.get_connection(peer_id) {
553 Ok(Some(connection)) => {
554 match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
556 .await
557 {
558 Ok(Ok(mut recv_stream)) => {
559 debug!(
560 "Receiving data from unidirectional stream from peer {:?}",
561 peer_id
562 );
563
564 match recv_stream.read_to_end(1024 * 1024).await {
566 Ok(buffer) => {
568 if !buffer.is_empty() {
569 debug!(
570 "Received {} bytes from peer {:?}",
571 buffer.len(),
572 peer_id
573 );
574 return Ok((*peer_id, buffer));
575 }
576 }
577 Err(e) => {
578 debug!(
579 "Failed to read from stream for peer {:?}: {}",
580 peer_id, e
581 );
582 }
583 }
584 }
585 Ok(Err(e)) => {
586 debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
587 }
588 Err(_) => {
589 }
591 }
592
593 match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
595 .await
596 {
597 Ok(Ok((_send_stream, mut recv_stream))) => {
598 debug!(
599 "Receiving data from bidirectional stream from peer {:?}",
600 peer_id
601 );
602
603 match recv_stream.read_to_end(1024 * 1024).await {
605 Ok(buffer) => {
607 if !buffer.is_empty() {
608 debug!(
609 "Received {} bytes from peer {:?} via bidirectional stream",
610 buffer.len(),
611 peer_id
612 );
613 return Ok((*peer_id, buffer));
614 }
615 }
616 Err(e) => {
617 debug!(
618 "Failed to read from bidirectional stream for peer {:?}: {}",
619 peer_id, e
620 );
621 }
622 }
623 }
624 Ok(Err(e)) => {
625 debug!(
626 "Failed to accept bidirectional stream from peer {:?}: {}",
627 peer_id, e
628 );
629 }
630 Err(_) => {
631 }
633 }
634 }
635 Ok(None) => {
636 debug!("No active connection for peer {:?}", peer_id);
637 }
638 Err(e) => {
639 debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
640 }
641 }
642 }
643
644 Err("No data available from any connected peer".into())
646 }
647
648 pub async fn get_stats(&self) -> NodeStats {
650 self.stats.lock().await.clone()
651 }
652
653 pub fn get_nat_endpoint(
655 &self,
656 ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
657 Ok(&*self.nat_endpoint)
658 }
659
660 pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
662 let stats = Arc::clone(&self.stats);
663 let interval_duration = self.config.stats_interval;
664
665 tokio::spawn(async move {
666 let mut interval = tokio::time::interval(interval_duration);
667
668 loop {
669 interval.tick().await;
670
671 let stats_snapshot = stats.lock().await.clone();
672
673 info!(
674 "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
675 stats_snapshot.active_connections,
676 stats_snapshot.successful_connections,
677 stats_snapshot.nat_traversal_successes,
678 stats_snapshot.nat_traversal_attempts
679 );
680 }
681 })
682 }
683
684 pub async fn get_nat_stats(
686 &self,
687 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
688 self.nat_endpoint.get_nat_stats()
689 }
690
691 pub fn inject_observed_address(
693 &self,
694 observed_address: SocketAddr,
695 from_peer: PeerId,
696 ) -> Result<(), NatTraversalError> {
697 self.nat_endpoint
698 .inject_observed_address(observed_address, from_peer)
699 }
700
701 pub async fn get_connection_metrics(
703 &self,
704 peer_id: &PeerId,
705 ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
706 match self.nat_endpoint.get_connection(peer_id) {
707 Ok(Some(connection)) => {
708 let rtt = connection.rtt();
710
711 let stats = connection.stats();
713
714 Ok(ConnectionMetrics {
715 bytes_sent: stats.udp_tx.bytes,
716 bytes_received: stats.udp_rx.bytes,
717 rtt: Some(rtt),
718 packet_loss: stats.path.lost_packets as f64
719 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
720 })
721 }
722 Ok(None) => Err("Connection not found".into()),
723 Err(e) => Err(format!("Failed to get connection: {e}").into()),
724 }
725 }
726
727 pub fn peer_id(&self) -> PeerId {
729 self.peer_id
730 }
731
732 pub fn public_key_bytes(&self) -> [u8; 32] {
734 self.auth_manager.public_key_bytes()
735 }
736
737 async fn send_auth_message(
739 &self,
740 peer_id: &PeerId,
741 message: AuthMessage,
742 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
743 let data = AuthManager::serialize_message(&message)?;
744 self.send_to_peer(peer_id, &data).await
745 }
746
747 async fn authenticate_as_initiator(
749 &self,
750 peer_id: &PeerId,
751 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
752 info!("Starting authentication with peer {:?}", peer_id);
753
754 let auth_request = self.auth_manager.create_auth_request();
756 self.send_auth_message(peer_id, auth_request).await?;
757
758 let timeout_duration = self.config.auth_config.auth_timeout;
760 let start = Instant::now();
761
762 while start.elapsed() < timeout_duration {
763 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
764 Ok(Ok((recv_peer_id, data))) => {
765 if recv_peer_id == *peer_id {
766 match AuthManager::deserialize_message(&data) {
767 Ok(AuthMessage::Challenge { nonce, .. }) => {
768 let response =
770 self.auth_manager.create_challenge_response(nonce)?;
771 self.send_auth_message(peer_id, response).await?;
772 }
773 Ok(AuthMessage::AuthSuccess { .. }) => {
774 info!("Authentication successful with peer {:?}", peer_id);
775 return Ok(());
776 }
777 Ok(AuthMessage::AuthFailure { reason }) => {
778 return Err(format!("Authentication failed: {reason}").into());
779 }
780 _ => continue,
781 }
782 }
783 }
784 _ => continue,
785 }
786 }
787
788 Err("Authentication timeout".into())
789 }
790
791 async fn handle_auth_message(
793 &self,
794 peer_id: PeerId,
795 message: AuthMessage,
796 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
797 let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
798
799 match auth_protocol.handle_message(peer_id, message).await {
800 Ok(Some(response)) => {
801 self.send_auth_message(&peer_id, response).await?;
802 }
803 Ok(None) => {
804 }
806 Err(e) => {
807 error!("Authentication error: {}", e);
808 let failure = AuthMessage::AuthFailure {
809 reason: e.to_string(),
810 };
811 self.send_auth_message(&peer_id, failure).await?;
812 return Err(Box::new(e));
813 }
814 }
815
816 Ok(())
817 }
818
819 pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
821 self.auth_manager.is_authenticated(peer_id).await
822 }
823
824 pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
826 self.auth_manager.list_authenticated_peers().await
827 }
828
829 async fn handle_incoming_auth(
831 &self,
832 peer_id: PeerId,
833 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
834 info!("Handling incoming authentication from peer {:?}", peer_id);
835
836 let timeout_duration = self.config.auth_config.auth_timeout;
837 let start = Instant::now();
838
839 while start.elapsed() < timeout_duration {
840 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
841 Ok(Ok((recv_peer_id, data))) => {
842 if recv_peer_id == peer_id {
843 match AuthManager::deserialize_message(&data) {
844 Ok(auth_msg) => {
845 self.handle_auth_message(peer_id, auth_msg).await?;
846
847 if self.auth_manager.is_authenticated(&peer_id).await {
849 info!("Peer {:?} successfully authenticated", peer_id);
850 return Ok(());
851 }
852 }
853 Err(_) => {
854 continue;
856 }
857 }
858 }
859 }
860 _ => continue,
861 }
862 }
863
864 Err("Authentication timeout waiting for peer".into())
865 }
866}