1use std::{
14 collections::HashMap,
15 net::SocketAddr,
16 sync::{
17 atomic::{AtomicBool, Ordering},
18 Arc,
19 },
20 time::{Duration, Instant},
21};
22
23use tracing::{debug, error, info};
24
25use crate::{
26 auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
27 crypto::raw_public_keys::key_utils::{
28 derive_peer_id_from_public_key, generate_ed25519_keypair,
29 },
30 nat_traversal_api::{
31 EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
32 NatTraversalEvent, NatTraversalStatistics, PeerId,
33 },
34};
35
36#[derive(Clone, Debug)]
38pub struct QuicP2PNode {
39 nat_endpoint: Arc<NatTraversalEndpoint>,
41 connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
43 stats: Arc<tokio::sync::Mutex<NodeStats>>,
45 config: QuicNodeConfig,
47 auth_manager: Arc<AuthManager>,
49 peer_id: PeerId,
51 shutdown: Arc<AtomicBool>,
53}
54
55#[derive(Debug, Clone)]
57pub struct QuicNodeConfig {
58 pub role: EndpointRole,
60 pub bootstrap_nodes: Vec<SocketAddr>,
62 pub enable_coordinator: bool,
64 pub max_connections: usize,
66 pub connection_timeout: Duration,
68 pub stats_interval: Duration,
70 pub auth_config: AuthConfig,
72 pub bind_addr: Option<SocketAddr>,
74}
75
76impl Default for QuicNodeConfig {
77 fn default() -> Self {
78 Self {
79 role: EndpointRole::Client,
80 bootstrap_nodes: Vec::new(),
81 enable_coordinator: false,
82 max_connections: 100,
83 connection_timeout: Duration::from_secs(30),
84 stats_interval: Duration::from_secs(30),
85 auth_config: AuthConfig::default(),
86 bind_addr: None,
87 }
88 }
89}
90
91#[derive(Debug, Clone)]
93pub struct ConnectionMetrics {
94 pub bytes_sent: u64,
96 pub bytes_received: u64,
98 pub rtt: Option<Duration>,
100 pub packet_loss: f64,
102}
103
104#[derive(Debug, Clone)]
106pub struct NodeStats {
107 pub active_connections: usize,
109 pub successful_connections: u64,
111 pub failed_connections: u64,
113 pub nat_traversal_attempts: u64,
115 pub nat_traversal_successes: u64,
117 pub start_time: Instant,
119}
120
121impl Default for NodeStats {
122 fn default() -> Self {
123 Self {
124 active_connections: 0,
125 successful_connections: 0,
126 failed_connections: 0,
127 nat_traversal_attempts: 0,
128 nat_traversal_successes: 0,
129 start_time: Instant::now(),
130 }
131 }
132}
133
134impl QuicP2PNode {
135 pub async fn new(
137 config: QuicNodeConfig,
138 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
139 let (secret_key, public_key) = generate_ed25519_keypair();
141 let peer_id = derive_peer_id_from_public_key(&public_key);
142
143 info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
144
145 let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
147
148 let nat_config = NatTraversalConfig {
150 role: config.role,
151 bootstrap_nodes: config.bootstrap_nodes.clone(),
152 max_candidates: 50,
153 coordination_timeout: Duration::from_secs(10),
154 enable_symmetric_nat: true,
155 enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
157 max_concurrent_attempts: 5,
158 bind_addr: config.bind_addr,
159 prefer_rfc_nat_traversal: true, timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
161 };
162
163 let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
165 start_time: Instant::now(),
166 ..Default::default()
167 }));
168 let stats_for_callback = Arc::clone(&stats_clone);
169
170 let event_callback = Box::new(move |event: NatTraversalEvent| {
171 let stats = stats_for_callback.clone();
172 tokio::spawn(async move {
173 let mut stats = stats.lock().await;
174 match event {
175 NatTraversalEvent::CoordinationRequested { .. } => {
176 stats.nat_traversal_attempts += 1;
177 }
178 NatTraversalEvent::ConnectionEstablished { .. } => {
179 stats.nat_traversal_successes += 1;
180 }
181 _ => {}
182 }
183 });
184 });
185
186 let nat_endpoint =
188 Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
189
190 let shutdown = Arc::new(AtomicBool::new(false));
192
193 let node = Self {
195 nat_endpoint,
196 connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
197 stats: stats_clone,
198 config,
199 auth_manager,
200 peer_id,
201 shutdown: shutdown.clone(),
202 };
203
204 Ok(node)
205 }
206
207 pub fn get_config(&self) -> &QuicNodeConfig {
209 &self.config
210 }
211
212 pub async fn connect_to_bootstrap(
214 &self,
215 bootstrap_addr: SocketAddr,
216 ) -> Result<PeerId, NatTraversalError> {
217 info!("Connecting to bootstrap node at {}", bootstrap_addr);
218
219 let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
221 NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
222 })?;
223
224 match endpoint.connect(bootstrap_addr, "bootstrap-node") {
226 Ok(connecting) => {
227 match connecting.await {
228 Ok(connection) => {
229 let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
233
234 let handler_connection = connection.clone();
236
237 self.nat_endpoint.add_connection(peer_id, connection)?;
239
240 self.nat_endpoint
241 .spawn_connection_handler(peer_id, handler_connection)?;
242
243 self.connected_peers
245 .write()
246 .await
247 .insert(peer_id, bootstrap_addr);
248
249 {
251 let mut stats = self.stats.lock().await;
252 stats.active_connections += 1;
253 stats.successful_connections += 1;
254 }
255
256 if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
258 callback(NatTraversalEvent::ConnectionEstablished {
259 peer_id,
260 remote_address: bootstrap_addr,
261 });
262 }
263
264 info!(
265 "Successfully connected to bootstrap node {} with peer ID {:?}",
266 bootstrap_addr, peer_id
267 );
268 Ok(peer_id)
269 }
270 Err(e) => {
271 error!(
272 "Failed to establish connection to bootstrap node {}: {}",
273 bootstrap_addr, e
274 );
275 {
276 let mut stats = self.stats.lock().await;
277 stats.failed_connections += 1;
278 }
279 Err(NatTraversalError::NetworkError(format!(
280 "Connection failed: {e}"
281 )))
282 }
283 }
284 }
285 Err(e) => {
286 error!(
287 "Failed to initiate connection to bootstrap node {}: {}",
288 bootstrap_addr, e
289 );
290 {
291 let mut stats = self.stats.lock().await;
292 stats.failed_connections += 1;
293 }
294 Err(NatTraversalError::NetworkError(format!(
295 "Connect error: {e}"
296 )))
297 }
298 }
299 }
300
301 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
303 use std::collections::hash_map::DefaultHasher;
304 use std::hash::{Hash, Hasher};
305
306 let mut hasher = DefaultHasher::new();
307 addr.hash(&mut hasher);
308 let hash = hasher.finish();
309
310 let mut peer_id_bytes = [0u8; 32];
311 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
312 let port_bytes = addr.port().to_le_bytes();
313 peer_id_bytes[8..10].copy_from_slice(&port_bytes);
314
315 PeerId(peer_id_bytes)
316 }
317
318 pub async fn connect_to_peer(
320 &self,
321 peer_id: PeerId,
322 coordinator: SocketAddr,
323 ) -> Result<SocketAddr, NatTraversalError> {
324 info!(
325 "Initiating connection to peer {:?} via coordinator {}",
326 peer_id, coordinator
327 );
328
329 {
331 let mut stats = self.stats.lock().await;
332 stats.nat_traversal_attempts += 1;
333 }
334
335 self.nat_endpoint
337 .initiate_nat_traversal(peer_id, coordinator)?;
338
339 let start = Instant::now();
341 let timeout = self.config.connection_timeout;
342
343 while start.elapsed() < timeout {
344 let events = self.nat_endpoint.poll(Instant::now())?;
345
346 for event in events {
347 match event {
348 NatTraversalEvent::ConnectionEstablished {
349 peer_id: evt_peer,
350 remote_address,
351 } => {
352 if evt_peer == peer_id {
353 {
355 let mut peers = self.connected_peers.write().await;
356 peers.insert(peer_id, remote_address);
357 }
358
359 {
361 let mut stats = self.stats.lock().await;
362 stats.successful_connections += 1;
363 stats.active_connections += 1;
364 stats.nat_traversal_successes += 1;
365 }
366
367 info!(
368 "Successfully connected to peer {:?} at {}",
369 peer_id, remote_address
370 );
371
372 if self.config.auth_config.require_authentication {
374 match self.authenticate_as_initiator(&peer_id).await {
375 Ok(_) => {
376 info!("Authentication successful with peer {:?}", peer_id);
377 }
378 Err(e) => {
379 error!(
380 "Authentication failed with peer {:?}: {}",
381 peer_id, e
382 );
383 self.connected_peers.write().await.remove(&peer_id);
385 let mut stats = self.stats.lock().await;
387 stats.active_connections =
388 stats.active_connections.saturating_sub(1);
389 stats.failed_connections += 1;
390 return Err(NatTraversalError::ConfigError(format!(
391 "Authentication failed: {e}"
392 )));
393 }
394 }
395 }
396
397 return Ok(remote_address);
398 }
399 }
400 NatTraversalEvent::TraversalFailed {
401 peer_id: evt_peer,
402 error,
403 fallback_available: _,
404 } => {
405 if evt_peer == peer_id {
406 {
408 let mut stats = self.stats.lock().await;
409 stats.failed_connections += 1;
410 }
411
412 error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
413 return Err(error);
414 }
415 }
416 _ => {
417 debug!("Received event: {:?}", event);
418 }
419 }
420 }
421
422 tokio::time::sleep(Duration::from_millis(100)).await;
424 }
425
426 {
428 let mut stats = self.stats.lock().await;
429 stats.failed_connections += 1;
430 }
431
432 Err(NatTraversalError::Timeout)
433 }
434
435 pub async fn accept(
437 &self,
438 ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
439 info!("Waiting for incoming connection...");
440
441 match self.nat_endpoint.accept_connection().await {
443 Ok((peer_id, connection)) => {
444 let remote_addr = connection.remote_address();
445
446 if let Err(e) = self.nat_endpoint.spawn_connection_handler(peer_id, connection) {
448 error!(
449 "Failed to spawn connection handler for peer {:?}: {}",
450 peer_id, e
451 );
452 return Err(Box::new(e));
453 }
454
455 {
457 let mut peers = self.connected_peers.write().await;
458 peers.insert(peer_id, remote_addr);
459 }
460
461 {
463 let mut stats = self.stats.lock().await;
464 stats.successful_connections += 1;
465 stats.active_connections += 1;
466 }
467
468 info!(
469 "Accepted connection from peer {:?} at {}",
470 peer_id, remote_addr
471 );
472
473 if self.config.auth_config.require_authentication {
475 let self_clone = self.clone();
477 let auth_peer_id = peer_id;
478 tokio::spawn(async move {
479 if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
480 error!(
481 "Failed to handle authentication for peer {:?}: {}",
482 auth_peer_id, e
483 );
484 self_clone
486 .connected_peers
487 .write()
488 .await
489 .remove(&auth_peer_id);
490 let mut stats = self_clone.stats.lock().await;
491 stats.active_connections = stats.active_connections.saturating_sub(1);
492 }
493 });
494 }
495
496 Ok((remote_addr, peer_id))
497 }
498 Err(e) => {
499 {
501 let mut stats = self.stats.lock().await;
502 stats.failed_connections += 1;
503 }
504
505 error!("Failed to accept connection: {}", e);
506 Err(Box::new(e))
507 }
508 }
509 }
510
511 pub async fn send_to_peer(
513 &self,
514 peer_id: &PeerId,
515 data: &[u8],
516 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
517 debug!("Attempting to send {} bytes to peer {:?}", data.len(), peer_id);
518
519 let peers = self.connected_peers.read().await;
520
521 if let Some(remote_addr) = peers.get(peer_id) {
522 debug!("Found peer {:?} at {}", peer_id, remote_addr);
523
524 match self.nat_endpoint.get_connection(peer_id) {
526 Ok(Some(connection)) => {
527 let mut send_stream = connection
529 .open_uni()
530 .await
531 .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
532
533 send_stream
535 .write_all(data)
536 .await
537 .map_err(|e| format!("Failed to write data: {e}"))?;
538
539 send_stream
541 .finish()
542 .map_err(|e| format!("Failed to finish stream: {e}"))?;
543
544 debug!(
545 "Successfully sent {} bytes to peer {:?}",
546 data.len(),
547 peer_id
548 );
549 Ok(())
550 }
551 Ok(None) => {
552 error!("No active connection found for peer {:?}", peer_id);
553 Err("No active connection".into())
554 }
555 Err(e) => {
556 error!("Failed to get connection for peer {:?}: {}", peer_id, e);
557 Err(Box::new(e))
558 }
559 }
560 } else {
561 error!("Peer {:?} not connected", peer_id);
562 Err("Peer not connected".into())
563 }
564 }
565
566 pub async fn receive(
568 &self,
569 ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
570 debug!("Waiting to receive data from any connected peer...");
571
572 let peers = {
574 let peers_guard = self.connected_peers.read().await;
575 peers_guard.clone()
576 };
577
578 if peers.is_empty() {
579 return Err("No connected peers".into());
580 }
581
582 for (peer_id, _remote_addr) in peers.iter() {
586 match self.nat_endpoint.get_connection(peer_id) {
587 Ok(Some(connection)) => {
588 match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
590 .await
591 {
592 Ok(Ok(mut recv_stream)) => {
593 debug!(
594 "Receiving data from unidirectional stream from peer {:?}",
595 peer_id
596 );
597
598 match recv_stream.read_to_end(1024 * 1024).await {
600 Ok(buffer) => {
602 if !buffer.is_empty() {
603 debug!(
604 "Received {} bytes from peer {:?}",
605 buffer.len(),
606 peer_id
607 );
608 return Ok((*peer_id, buffer));
609 }
610 }
611 Err(e) => {
612 debug!(
613 "Failed to read from stream for peer {:?}: {}",
614 peer_id, e
615 );
616 }
617 }
618 }
619 Ok(Err(e)) => {
620 debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
621 }
622 Err(_) => {
623 }
625 }
626
627 match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
629 .await
630 {
631 Ok(Ok((_send_stream, mut recv_stream))) => {
632 debug!(
633 "Receiving data from bidirectional stream from peer {:?}",
634 peer_id
635 );
636
637 match recv_stream.read_to_end(1024 * 1024).await {
639 Ok(buffer) => {
641 if !buffer.is_empty() {
642 debug!(
643 "Received {} bytes from peer {:?} via bidirectional stream",
644 buffer.len(),
645 peer_id
646 );
647 return Ok((*peer_id, buffer));
648 }
649 }
650 Err(e) => {
651 debug!(
652 "Failed to read from bidirectional stream for peer {:?}: {}",
653 peer_id, e
654 );
655 }
656 }
657 }
658 Ok(Err(e)) => {
659 debug!(
660 "Failed to accept bidirectional stream from peer {:?}: {}",
661 peer_id, e
662 );
663 }
664 Err(_) => {
665 }
667 }
668 }
669 Ok(None) => {
670 debug!("No active connection for peer {:?}", peer_id);
671 }
672 Err(e) => {
673 debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
674 }
675 }
676 }
677
678 Err("No data available from any connected peer".into())
680 }
681
682 pub async fn get_stats(&self) -> NodeStats {
684 self.stats.lock().await.clone()
685 }
686
687 pub fn get_nat_endpoint(
689 &self,
690 ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
691 Ok(&*self.nat_endpoint)
692 }
693
694 pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
696 let stats = Arc::clone(&self.stats);
697 let interval_duration = self.config.stats_interval;
698
699 tokio::spawn(async move {
700 let mut interval = tokio::time::interval(interval_duration);
701
702 loop {
703 interval.tick().await;
704
705 let stats_snapshot = stats.lock().await.clone();
706
707 info!(
708 "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
709 stats_snapshot.active_connections,
710 stats_snapshot.successful_connections,
711 stats_snapshot.nat_traversal_successes,
712 stats_snapshot.nat_traversal_attempts
713 );
714 }
715 })
716 }
717
718 pub async fn get_nat_stats(
720 &self,
721 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
722 self.nat_endpoint.get_nat_stats()
723 }
724
725 pub async fn get_connection_metrics(
729 &self,
730 peer_id: &PeerId,
731 ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
732 match self.nat_endpoint.get_connection(peer_id) {
733 Ok(Some(connection)) => {
734 let rtt = connection.rtt();
736
737 let stats = connection.stats();
739
740 Ok(ConnectionMetrics {
741 bytes_sent: stats.udp_tx.bytes,
742 bytes_received: stats.udp_rx.bytes,
743 rtt: Some(rtt),
744 packet_loss: stats.path.lost_packets as f64
745 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
746 })
747 }
748 Ok(None) => Err("Connection not found".into()),
749 Err(e) => Err(format!("Failed to get connection: {e}").into()),
750 }
751 }
752
753 pub fn peer_id(&self) -> PeerId {
755 self.peer_id
756 }
757
758 pub fn public_key_bytes(&self) -> [u8; 32] {
760 self.auth_manager.public_key_bytes()
761 }
762
763 async fn send_auth_message(
765 &self,
766 peer_id: &PeerId,
767 message: AuthMessage,
768 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
769 let data = AuthManager::serialize_message(&message)?;
770 self.send_to_peer(peer_id, &data).await
771 }
772
773 async fn authenticate_as_initiator(
775 &self,
776 peer_id: &PeerId,
777 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
778 info!("Starting authentication with peer {:?}", peer_id);
779
780 let auth_request = self.auth_manager.create_auth_request();
782 self.send_auth_message(peer_id, auth_request).await?;
783
784 let timeout_duration = self.config.auth_config.auth_timeout;
786 let start = Instant::now();
787
788 while start.elapsed() < timeout_duration {
789 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
790 Ok(Ok((recv_peer_id, data))) => {
791 if recv_peer_id == *peer_id {
792 match AuthManager::deserialize_message(&data) {
793 Ok(AuthMessage::Challenge { nonce, .. }) => {
794 let response =
796 self.auth_manager.create_challenge_response(nonce)?;
797 self.send_auth_message(peer_id, response).await?;
798 }
799 Ok(AuthMessage::AuthSuccess { .. }) => {
800 info!("Authentication successful with peer {:?}", peer_id);
801 return Ok(());
802 }
803 Ok(AuthMessage::AuthFailure { reason }) => {
804 return Err(format!("Authentication failed: {reason}").into());
805 }
806 _ => continue,
807 }
808 }
809 }
810 _ => continue,
811 }
812 }
813
814 Err("Authentication timeout".into())
815 }
816
817 async fn handle_auth_message(
819 &self,
820 peer_id: PeerId,
821 message: AuthMessage,
822 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
823 let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
824
825 match auth_protocol.handle_message(peer_id, message).await {
826 Ok(Some(response)) => {
827 self.send_auth_message(&peer_id, response).await?;
828 }
829 Ok(None) => {
830 }
832 Err(e) => {
833 error!("Authentication error: {}", e);
834 let failure = AuthMessage::AuthFailure {
835 reason: e.to_string(),
836 };
837 self.send_auth_message(&peer_id, failure).await?;
838 return Err(Box::new(e));
839 }
840 }
841
842 Ok(())
843 }
844
845 pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
847 self.auth_manager.is_authenticated(peer_id).await
848 }
849
850 pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
852 self.auth_manager.list_authenticated_peers().await
853 }
854
855 async fn handle_incoming_auth(
857 &self,
858 peer_id: PeerId,
859 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
860 info!("Handling incoming authentication from peer {:?}", peer_id);
861
862 let timeout_duration = self.config.auth_config.auth_timeout;
863 let start = Instant::now();
864
865 while start.elapsed() < timeout_duration {
866 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
867 Ok(Ok((recv_peer_id, data))) => {
868 if recv_peer_id == peer_id {
869 match AuthManager::deserialize_message(&data) {
870 Ok(auth_msg) => {
871 self.handle_auth_message(peer_id, auth_msg).await?;
872
873 if self.auth_manager.is_authenticated(&peer_id).await {
875 info!("Peer {:?} successfully authenticated", peer_id);
876 return Ok(());
877 }
878 }
879 Err(_) => {
880 continue;
882 }
883 }
884 }
885 }
886 _ => continue,
887 }
888 }
889
890 Err("Authentication timeout waiting for peer".into())
891 }
892
893 pub fn get_metrics_collector(
895 &self,
896 ) -> Result<Arc<crate::logging::MetricsCollector>, &'static str> {
897 Ok(Arc::new(crate::logging::MetricsCollector::new()))
901 }
902
903 pub fn shutdown(&self) {
905 info!("Shutting down QuicP2PNode");
906 self.shutdown.store(true, Ordering::SeqCst);
907
908 if let Some(endpoint) = self.nat_endpoint.get_quinn_endpoint() {
910 endpoint.close(0u32.into(), b"node shutdown");
911 }
912 }
913}
914
915impl Drop for QuicP2PNode {
917 fn drop(&mut self) {
918 self.shutdown();
919 }
920}