1use std::{
14 collections::HashMap,
15 net::SocketAddr,
16 sync::{
17 Arc,
18 atomic::{AtomicBool, Ordering},
19 },
20 time::{Duration, Instant},
21};
22
23use tracing::{debug, error, info};
24
25use crate::{
26 auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
27 crypto::raw_public_keys::key_utils::{
28 derive_peer_id_from_public_key, generate_ed25519_keypair,
29 },
30 nat_traversal_api::{
31 EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
32 NatTraversalEvent, NatTraversalStatistics, PeerId,
33 },
34};
35
36#[derive(Clone, Debug)]
38pub struct QuicP2PNode {
39 nat_endpoint: Arc<NatTraversalEndpoint>,
41 connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
43 stats: Arc<tokio::sync::Mutex<NodeStats>>,
45 config: QuicNodeConfig,
47 auth_manager: Arc<AuthManager>,
49 peer_id: PeerId,
51 shutdown: Arc<AtomicBool>,
53}
54
55#[derive(Debug, Clone)]
57pub struct QuicNodeConfig {
58 pub role: EndpointRole,
60 pub bootstrap_nodes: Vec<SocketAddr>,
62 pub enable_coordinator: bool,
64 pub max_connections: usize,
66 pub connection_timeout: Duration,
68 pub stats_interval: Duration,
70 pub auth_config: AuthConfig,
72 pub bind_addr: Option<SocketAddr>,
74}
75
76impl Default for QuicNodeConfig {
77 fn default() -> Self {
78 Self {
79 role: EndpointRole::Client,
80 bootstrap_nodes: Vec::new(),
81 enable_coordinator: false,
82 max_connections: 100,
83 connection_timeout: Duration::from_secs(30),
84 stats_interval: Duration::from_secs(30),
85 auth_config: AuthConfig::default(),
86 bind_addr: None,
87 }
88 }
89}
90
91#[derive(Debug, Clone)]
93pub struct ConnectionMetrics {
94 pub bytes_sent: u64,
96 pub bytes_received: u64,
98 pub rtt: Option<Duration>,
100 pub packet_loss: f64,
102}
103
104#[derive(Debug, Clone)]
106pub struct NodeStats {
107 pub active_connections: usize,
109 pub successful_connections: u64,
111 pub failed_connections: u64,
113 pub nat_traversal_attempts: u64,
115 pub nat_traversal_successes: u64,
117 pub start_time: Instant,
119}
120
121impl Default for NodeStats {
122 fn default() -> Self {
123 Self {
124 active_connections: 0,
125 successful_connections: 0,
126 failed_connections: 0,
127 nat_traversal_attempts: 0,
128 nat_traversal_successes: 0,
129 start_time: Instant::now(),
130 }
131 }
132}
133
134impl QuicP2PNode {
135 pub async fn new(
137 config: QuicNodeConfig,
138 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
139 let (secret_key, public_key) = generate_ed25519_keypair();
141 let peer_id = derive_peer_id_from_public_key(&public_key);
142
143 info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
144
145 let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
147
148 let nat_config = NatTraversalConfig {
150 role: config.role,
151 bootstrap_nodes: config.bootstrap_nodes.clone(),
152 max_candidates: 50,
153 coordination_timeout: Duration::from_secs(10),
154 enable_symmetric_nat: true,
155 enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
157 max_concurrent_attempts: 5,
158 bind_addr: config.bind_addr,
159 prefer_rfc_nat_traversal: true, timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
161 };
162
163 let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
165 start_time: Instant::now(),
166 ..Default::default()
167 }));
168 let stats_for_callback = Arc::clone(&stats_clone);
169
170 let event_callback = Box::new(move |event: NatTraversalEvent| {
171 let stats = stats_for_callback.clone();
172 tokio::spawn(async move {
173 let mut stats = stats.lock().await;
174 match event {
175 NatTraversalEvent::CoordinationRequested { .. } => {
176 stats.nat_traversal_attempts += 1;
177 }
178 NatTraversalEvent::ConnectionEstablished { .. } => {
179 stats.nat_traversal_successes += 1;
180 }
181 _ => {}
182 }
183 });
184 });
185
186 let nat_endpoint =
188 Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
189
190 let shutdown = Arc::new(AtomicBool::new(false));
192
193 let node = Self {
195 nat_endpoint,
196 connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
197 stats: stats_clone,
198 config,
199 auth_manager,
200 peer_id,
201 shutdown: shutdown.clone(),
202 };
203
204 Ok(node)
205 }
206
207 pub fn get_config(&self) -> &QuicNodeConfig {
209 &self.config
210 }
211
212 pub async fn connect_to_bootstrap(
214 &self,
215 bootstrap_addr: SocketAddr,
216 ) -> Result<PeerId, NatTraversalError> {
217 info!("Connecting to bootstrap node at {}", bootstrap_addr);
218
219 let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
221 NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
222 })?;
223
224 match endpoint.connect(bootstrap_addr, "bootstrap-node") {
226 Ok(connecting) => {
227 match connecting.await {
228 Ok(connection) => {
229 let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
233
234 let handler_connection = connection.clone();
236
237 self.nat_endpoint.add_connection(peer_id, connection)?;
239
240 self.nat_endpoint
241 .spawn_connection_handler(peer_id, handler_connection)?;
242
243 self.connected_peers
245 .write()
246 .await
247 .insert(peer_id, bootstrap_addr);
248
249 {
251 let mut stats = self.stats.lock().await;
252 stats.active_connections += 1;
253 stats.successful_connections += 1;
254 }
255
256 if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
258 callback(NatTraversalEvent::ConnectionEstablished {
259 peer_id,
260 remote_address: bootstrap_addr,
261 });
262 }
263
264 info!(
265 "Successfully connected to bootstrap node {} with peer ID {:?}",
266 bootstrap_addr, peer_id
267 );
268 Ok(peer_id)
269 }
270 Err(e) => {
271 error!(
272 "Failed to establish connection to bootstrap node {}: {}",
273 bootstrap_addr, e
274 );
275 {
276 let mut stats = self.stats.lock().await;
277 stats.failed_connections += 1;
278 }
279 Err(NatTraversalError::NetworkError(format!(
280 "Connection failed: {e}"
281 )))
282 }
283 }
284 }
285 Err(e) => {
286 error!(
287 "Failed to initiate connection to bootstrap node {}: {}",
288 bootstrap_addr, e
289 );
290 {
291 let mut stats = self.stats.lock().await;
292 stats.failed_connections += 1;
293 }
294 Err(NatTraversalError::NetworkError(format!(
295 "Connect error: {e}"
296 )))
297 }
298 }
299 }
300
301 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
303 use std::collections::hash_map::DefaultHasher;
304 use std::hash::{Hash, Hasher};
305
306 let mut hasher = DefaultHasher::new();
307 addr.hash(&mut hasher);
308 let hash = hasher.finish();
309
310 let mut peer_id_bytes = [0u8; 32];
311 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
312 let port_bytes = addr.port().to_le_bytes();
313 peer_id_bytes[8..10].copy_from_slice(&port_bytes);
314
315 PeerId(peer_id_bytes)
316 }
317
318 pub async fn connect_to_peer(
320 &self,
321 peer_id: PeerId,
322 coordinator: SocketAddr,
323 ) -> Result<SocketAddr, NatTraversalError> {
324 info!(
325 "Initiating connection to peer {:?} via coordinator {}",
326 peer_id, coordinator
327 );
328
329 {
331 let mut stats = self.stats.lock().await;
332 stats.nat_traversal_attempts += 1;
333 }
334
335 self.nat_endpoint
337 .initiate_nat_traversal(peer_id, coordinator)?;
338
339 let start = Instant::now();
341 let timeout = self.config.connection_timeout;
342
343 while start.elapsed() < timeout {
344 let events = self.nat_endpoint.poll(Instant::now())?;
345
346 for event in events {
347 match event {
348 NatTraversalEvent::ConnectionEstablished {
349 peer_id: evt_peer,
350 remote_address,
351 } => {
352 if evt_peer == peer_id {
353 {
355 let mut peers = self.connected_peers.write().await;
356 peers.insert(peer_id, remote_address);
357 }
358
359 {
361 let mut stats = self.stats.lock().await;
362 stats.successful_connections += 1;
363 stats.active_connections += 1;
364 stats.nat_traversal_successes += 1;
365 }
366
367 info!(
368 "Successfully connected to peer {:?} at {}",
369 peer_id, remote_address
370 );
371
372 if self.config.auth_config.require_authentication {
374 match self.authenticate_as_initiator(&peer_id).await {
375 Ok(_) => {
376 info!("Authentication successful with peer {:?}", peer_id);
377 }
378 Err(e) => {
379 error!(
380 "Authentication failed with peer {:?}: {}",
381 peer_id, e
382 );
383 self.connected_peers.write().await.remove(&peer_id);
385 let mut stats = self.stats.lock().await;
387 stats.active_connections =
388 stats.active_connections.saturating_sub(1);
389 stats.failed_connections += 1;
390 return Err(NatTraversalError::ConfigError(format!(
391 "Authentication failed: {e}"
392 )));
393 }
394 }
395 }
396
397 return Ok(remote_address);
398 }
399 }
400 NatTraversalEvent::TraversalFailed {
401 peer_id: evt_peer,
402 error,
403 fallback_available: _,
404 } => {
405 if evt_peer == peer_id {
406 {
408 let mut stats = self.stats.lock().await;
409 stats.failed_connections += 1;
410 }
411
412 error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
413 return Err(error);
414 }
415 }
416 _ => {
417 debug!("Received event: {:?}", event);
418 }
419 }
420 }
421
422 tokio::time::sleep(Duration::from_millis(100)).await;
424 }
425
426 {
428 let mut stats = self.stats.lock().await;
429 stats.failed_connections += 1;
430 }
431
432 Err(NatTraversalError::Timeout)
433 }
434
435 pub async fn accept(
437 &self,
438 ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
439 info!("Waiting for incoming connection...");
440
441 match self.nat_endpoint.accept_connection().await {
443 Ok((peer_id, connection)) => {
444 let remote_addr = connection.remote_address();
445
446 if let Err(e) = self
448 .nat_endpoint
449 .spawn_connection_handler(peer_id, connection)
450 {
451 error!(
452 "Failed to spawn connection handler for peer {:?}: {}",
453 peer_id, e
454 );
455 return Err(Box::new(e));
456 }
457
458 {
460 let mut peers = self.connected_peers.write().await;
461 peers.insert(peer_id, remote_addr);
462 }
463
464 {
466 let mut stats = self.stats.lock().await;
467 stats.successful_connections += 1;
468 stats.active_connections += 1;
469 }
470
471 info!(
472 "Accepted connection from peer {:?} at {}",
473 peer_id, remote_addr
474 );
475
476 if self.config.auth_config.require_authentication {
478 let self_clone = self.clone();
480 let auth_peer_id = peer_id;
481 tokio::spawn(async move {
482 if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
483 error!(
484 "Failed to handle authentication for peer {:?}: {}",
485 auth_peer_id, e
486 );
487 self_clone
489 .connected_peers
490 .write()
491 .await
492 .remove(&auth_peer_id);
493 let mut stats = self_clone.stats.lock().await;
494 stats.active_connections = stats.active_connections.saturating_sub(1);
495 }
496 });
497 }
498
499 Ok((remote_addr, peer_id))
500 }
501 Err(e) => {
502 {
504 let mut stats = self.stats.lock().await;
505 stats.failed_connections += 1;
506 }
507
508 error!("Failed to accept connection: {}", e);
509 Err(Box::new(e))
510 }
511 }
512 }
513
514 pub async fn send_to_peer(
516 &self,
517 peer_id: &PeerId,
518 data: &[u8],
519 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
520 debug!(
521 "Attempting to send {} bytes to peer {:?}",
522 data.len(),
523 peer_id
524 );
525
526 let peers = self.connected_peers.read().await;
527
528 if let Some(remote_addr) = peers.get(peer_id) {
529 debug!("Found peer {:?} at {}", peer_id, remote_addr);
530
531 match self.nat_endpoint.get_connection(peer_id) {
533 Ok(Some(connection)) => {
534 let mut send_stream = connection
536 .open_uni()
537 .await
538 .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
539
540 send_stream
542 .write_all(data)
543 .await
544 .map_err(|e| format!("Failed to write data: {e}"))?;
545
546 send_stream
548 .finish()
549 .map_err(|e| format!("Failed to finish stream: {e}"))?;
550
551 debug!(
552 "Successfully sent {} bytes to peer {:?}",
553 data.len(),
554 peer_id
555 );
556 Ok(())
557 }
558 Ok(None) => {
559 error!("No active connection found for peer {:?}", peer_id);
560 Err("No active connection".into())
561 }
562 Err(e) => {
563 error!("Failed to get connection for peer {:?}: {}", peer_id, e);
564 Err(Box::new(e))
565 }
566 }
567 } else {
568 error!("Peer {:?} not connected", peer_id);
569 Err("Peer not connected".into())
570 }
571 }
572
573 pub async fn receive(
575 &self,
576 ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
577 debug!("Waiting to receive data from any connected peer...");
578
579 let peers = {
581 let peers_guard = self.connected_peers.read().await;
582 peers_guard.clone()
583 };
584
585 if peers.is_empty() {
586 return Err("No connected peers".into());
587 }
588
589 for (peer_id, _remote_addr) in peers.iter() {
593 match self.nat_endpoint.get_connection(peer_id) {
594 Ok(Some(connection)) => {
595 match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
597 .await
598 {
599 Ok(Ok(mut recv_stream)) => {
600 debug!(
601 "Receiving data from unidirectional stream from peer {:?}",
602 peer_id
603 );
604
605 match recv_stream.read_to_end(1024 * 1024).await {
607 Ok(buffer) => {
609 if !buffer.is_empty() {
610 debug!(
611 "Received {} bytes from peer {:?}",
612 buffer.len(),
613 peer_id
614 );
615 return Ok((*peer_id, buffer));
616 }
617 }
618 Err(e) => {
619 debug!(
620 "Failed to read from stream for peer {:?}: {}",
621 peer_id, e
622 );
623 }
624 }
625 }
626 Ok(Err(e)) => {
627 debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
628 }
629 Err(_) => {
630 }
632 }
633
634 match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
636 .await
637 {
638 Ok(Ok((_send_stream, mut recv_stream))) => {
639 debug!(
640 "Receiving data from bidirectional stream from peer {:?}",
641 peer_id
642 );
643
644 match recv_stream.read_to_end(1024 * 1024).await {
646 Ok(buffer) => {
648 if !buffer.is_empty() {
649 debug!(
650 "Received {} bytes from peer {:?} via bidirectional stream",
651 buffer.len(),
652 peer_id
653 );
654 return Ok((*peer_id, buffer));
655 }
656 }
657 Err(e) => {
658 debug!(
659 "Failed to read from bidirectional stream for peer {:?}: {}",
660 peer_id, e
661 );
662 }
663 }
664 }
665 Ok(Err(e)) => {
666 debug!(
667 "Failed to accept bidirectional stream from peer {:?}: {}",
668 peer_id, e
669 );
670 }
671 Err(_) => {
672 }
674 }
675 }
676 Ok(None) => {
677 debug!("No active connection for peer {:?}", peer_id);
678 }
679 Err(e) => {
680 debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
681 }
682 }
683 }
684
685 Err("No data available from any connected peer".into())
687 }
688
689 pub async fn get_stats(&self) -> NodeStats {
691 self.stats.lock().await.clone()
692 }
693
694 pub fn get_nat_endpoint(
696 &self,
697 ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
698 Ok(&*self.nat_endpoint)
699 }
700
701 pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
703 let stats = Arc::clone(&self.stats);
704 let interval_duration = self.config.stats_interval;
705
706 tokio::spawn(async move {
707 let mut interval = tokio::time::interval(interval_duration);
708
709 loop {
710 interval.tick().await;
711
712 let stats_snapshot = stats.lock().await.clone();
713
714 info!(
715 "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
716 stats_snapshot.active_connections,
717 stats_snapshot.successful_connections,
718 stats_snapshot.nat_traversal_successes,
719 stats_snapshot.nat_traversal_attempts
720 );
721 }
722 })
723 }
724
725 pub async fn get_nat_stats(
727 &self,
728 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
729 self.nat_endpoint.get_nat_stats()
730 }
731
732 pub async fn get_connection_metrics(
736 &self,
737 peer_id: &PeerId,
738 ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
739 match self.nat_endpoint.get_connection(peer_id) {
740 Ok(Some(connection)) => {
741 let rtt = connection.rtt();
743
744 let stats = connection.stats();
746
747 Ok(ConnectionMetrics {
748 bytes_sent: stats.udp_tx.bytes,
749 bytes_received: stats.udp_rx.bytes,
750 rtt: Some(rtt),
751 packet_loss: stats.path.lost_packets as f64
752 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
753 })
754 }
755 Ok(None) => Err("Connection not found".into()),
756 Err(e) => Err(format!("Failed to get connection: {e}").into()),
757 }
758 }
759
760 pub fn peer_id(&self) -> PeerId {
762 self.peer_id
763 }
764
765 pub fn public_key_bytes(&self) -> [u8; 32] {
767 self.auth_manager.public_key_bytes()
768 }
769
770 async fn send_auth_message(
772 &self,
773 peer_id: &PeerId,
774 message: AuthMessage,
775 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
776 let data = AuthManager::serialize_message(&message)?;
777 self.send_to_peer(peer_id, &data).await
778 }
779
780 async fn authenticate_as_initiator(
782 &self,
783 peer_id: &PeerId,
784 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
785 info!("Starting authentication with peer {:?}", peer_id);
786
787 let auth_request = self.auth_manager.create_auth_request();
789 self.send_auth_message(peer_id, auth_request).await?;
790
791 let timeout_duration = self.config.auth_config.auth_timeout;
793 let start = Instant::now();
794
795 while start.elapsed() < timeout_duration {
796 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
797 Ok(Ok((recv_peer_id, data))) => {
798 if recv_peer_id == *peer_id {
799 match AuthManager::deserialize_message(&data) {
800 Ok(AuthMessage::Challenge { nonce, .. }) => {
801 let response =
803 self.auth_manager.create_challenge_response(nonce)?;
804 self.send_auth_message(peer_id, response).await?;
805 }
806 Ok(AuthMessage::AuthSuccess { .. }) => {
807 info!("Authentication successful with peer {:?}", peer_id);
808 return Ok(());
809 }
810 Ok(AuthMessage::AuthFailure { reason }) => {
811 return Err(format!("Authentication failed: {reason}").into());
812 }
813 _ => continue,
814 }
815 }
816 }
817 _ => continue,
818 }
819 }
820
821 Err("Authentication timeout".into())
822 }
823
824 async fn handle_auth_message(
826 &self,
827 peer_id: PeerId,
828 message: AuthMessage,
829 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
830 let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
831
832 match auth_protocol.handle_message(peer_id, message).await {
833 Ok(Some(response)) => {
834 self.send_auth_message(&peer_id, response).await?;
835 }
836 Ok(None) => {
837 }
839 Err(e) => {
840 error!("Authentication error: {}", e);
841 let failure = AuthMessage::AuthFailure {
842 reason: e.to_string(),
843 };
844 self.send_auth_message(&peer_id, failure).await?;
845 return Err(Box::new(e));
846 }
847 }
848
849 Ok(())
850 }
851
852 pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
854 self.auth_manager.is_authenticated(peer_id).await
855 }
856
857 pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
859 self.auth_manager.list_authenticated_peers().await
860 }
861
862 async fn handle_incoming_auth(
864 &self,
865 peer_id: PeerId,
866 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
867 info!("Handling incoming authentication from peer {:?}", peer_id);
868
869 let timeout_duration = self.config.auth_config.auth_timeout;
870 let start = Instant::now();
871
872 while start.elapsed() < timeout_duration {
873 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
874 Ok(Ok((recv_peer_id, data))) => {
875 if recv_peer_id == peer_id {
876 match AuthManager::deserialize_message(&data) {
877 Ok(auth_msg) => {
878 self.handle_auth_message(peer_id, auth_msg).await?;
879
880 if self.auth_manager.is_authenticated(&peer_id).await {
882 info!("Peer {:?} successfully authenticated", peer_id);
883 return Ok(());
884 }
885 }
886 Err(_) => {
887 continue;
889 }
890 }
891 }
892 }
893 _ => continue,
894 }
895 }
896
897 Err("Authentication timeout waiting for peer".into())
898 }
899
900 pub fn get_metrics_collector(
902 &self,
903 ) -> Result<Arc<crate::logging::MetricsCollector>, &'static str> {
904 Ok(Arc::new(crate::logging::MetricsCollector::new()))
908 }
909
910 pub fn shutdown(&self) {
912 info!("Shutting down QuicP2PNode");
913 self.shutdown.store(true, Ordering::SeqCst);
914
915 if let Some(endpoint) = self.nat_endpoint.get_quinn_endpoint() {
917 endpoint.close(0u32.into(), b"node shutdown");
918 }
919 }
920}
921
922impl Drop for QuicP2PNode {
924 fn drop(&mut self) {
925 self.shutdown();
926 }
927}