1use std::{
14 collections::HashMap,
15 net::SocketAddr,
16 sync::Arc,
17 time::{Duration, Instant},
18};
19
20use tracing::{debug, error, info};
21
22use crate::{
23 auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
24 crypto::raw_public_keys::key_utils::{
25 derive_peer_id_from_public_key, generate_ed25519_keypair,
26 },
27 nat_traversal_api::{
28 EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
29 NatTraversalEvent, NatTraversalStatistics, PeerId,
30 },
31};
32
33#[derive(Clone, Debug)]
35pub struct QuicP2PNode {
36 nat_endpoint: Arc<NatTraversalEndpoint>,
38 connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
40 stats: Arc<tokio::sync::Mutex<NodeStats>>,
42 config: QuicNodeConfig,
44 auth_manager: Arc<AuthManager>,
46 peer_id: PeerId,
48}
49
50#[derive(Debug, Clone)]
52pub struct QuicNodeConfig {
53 pub role: EndpointRole,
55 pub bootstrap_nodes: Vec<SocketAddr>,
57 pub enable_coordinator: bool,
59 pub max_connections: usize,
61 pub connection_timeout: Duration,
63 pub stats_interval: Duration,
65 pub auth_config: AuthConfig,
67 pub bind_addr: Option<SocketAddr>,
69}
70
71impl Default for QuicNodeConfig {
72 fn default() -> Self {
73 Self {
74 role: EndpointRole::Client,
75 bootstrap_nodes: Vec::new(),
76 enable_coordinator: false,
77 max_connections: 100,
78 connection_timeout: Duration::from_secs(30),
79 stats_interval: Duration::from_secs(30),
80 auth_config: AuthConfig::default(),
81 bind_addr: None,
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct ConnectionMetrics {
89 pub bytes_sent: u64,
91 pub bytes_received: u64,
93 pub rtt: Option<Duration>,
95 pub packet_loss: f64,
97}
98
99#[derive(Debug, Clone)]
101pub struct NodeStats {
102 pub active_connections: usize,
104 pub successful_connections: u64,
106 pub failed_connections: u64,
108 pub nat_traversal_attempts: u64,
110 pub nat_traversal_successes: u64,
112 pub start_time: Instant,
114}
115
116impl Default for NodeStats {
117 fn default() -> Self {
118 Self {
119 active_connections: 0,
120 successful_connections: 0,
121 failed_connections: 0,
122 nat_traversal_attempts: 0,
123 nat_traversal_successes: 0,
124 start_time: Instant::now(),
125 }
126 }
127}
128
129impl QuicP2PNode {
130 pub async fn new(
132 config: QuicNodeConfig,
133 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
134 let (secret_key, public_key) = generate_ed25519_keypair();
136 let peer_id = derive_peer_id_from_public_key(&public_key);
137
138 info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
139
140 let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
142
143 let nat_config = NatTraversalConfig {
145 role: config.role,
146 bootstrap_nodes: config.bootstrap_nodes.clone(),
147 max_candidates: 50,
148 coordination_timeout: Duration::from_secs(10),
149 enable_symmetric_nat: true,
150 enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
152 max_concurrent_attempts: 5,
153 bind_addr: config.bind_addr,
154 prefer_rfc_nat_traversal: true, timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
156 };
157
158 let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
160 start_time: Instant::now(),
161 ..Default::default()
162 }));
163 let stats_for_callback = Arc::clone(&stats_clone);
164
165 let event_callback = Box::new(move |event: NatTraversalEvent| {
166 let stats = stats_for_callback.clone();
167 tokio::spawn(async move {
168 let mut stats = stats.lock().await;
169 match event {
170 NatTraversalEvent::CoordinationRequested { .. } => {
171 stats.nat_traversal_attempts += 1;
172 }
173 NatTraversalEvent::ConnectionEstablished { .. } => {
174 stats.nat_traversal_successes += 1;
175 }
176 _ => {}
177 }
178 });
179 });
180
181 let nat_endpoint =
183 Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
184
185 Ok(Self {
186 nat_endpoint,
187 connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
188 stats: stats_clone,
189 config,
190 auth_manager,
191 peer_id,
192 })
193 }
194
195 pub fn get_config(&self) -> &QuicNodeConfig {
197 &self.config
198 }
199
200 pub async fn connect_to_bootstrap(
202 &self,
203 bootstrap_addr: SocketAddr,
204 ) -> Result<PeerId, NatTraversalError> {
205 info!("Connecting to bootstrap node at {}", bootstrap_addr);
206
207 let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
209 NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
210 })?;
211
212 match endpoint.connect(bootstrap_addr, "bootstrap-node") {
214 Ok(connecting) => {
215 match connecting.await {
216 Ok(_connection) => {
217 let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
221
222 self.connected_peers
224 .write()
225 .await
226 .insert(peer_id, bootstrap_addr);
227
228 {
230 let mut stats = self.stats.lock().await;
231 stats.active_connections += 1;
232 stats.successful_connections += 1;
233 }
234
235 if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
237 callback(NatTraversalEvent::ConnectionEstablished {
238 peer_id,
239 remote_address: bootstrap_addr,
240 });
241 }
242
243 info!(
244 "Successfully connected to bootstrap node {} with peer ID {:?}",
245 bootstrap_addr, peer_id
246 );
247 Ok(peer_id)
248 }
249 Err(e) => {
250 error!(
251 "Failed to establish connection to bootstrap node {}: {}",
252 bootstrap_addr, e
253 );
254 {
255 let mut stats = self.stats.lock().await;
256 stats.failed_connections += 1;
257 }
258 Err(NatTraversalError::NetworkError(format!(
259 "Connection failed: {e}"
260 )))
261 }
262 }
263 }
264 Err(e) => {
265 error!(
266 "Failed to initiate connection to bootstrap node {}: {}",
267 bootstrap_addr, e
268 );
269 {
270 let mut stats = self.stats.lock().await;
271 stats.failed_connections += 1;
272 }
273 Err(NatTraversalError::NetworkError(format!(
274 "Connect error: {e}"
275 )))
276 }
277 }
278 }
279
280 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
282 use std::collections::hash_map::DefaultHasher;
283 use std::hash::{Hash, Hasher};
284
285 let mut hasher = DefaultHasher::new();
286 addr.hash(&mut hasher);
287 let hash = hasher.finish();
288
289 let mut peer_id_bytes = [0u8; 32];
290 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
291 let port_bytes = addr.port().to_le_bytes();
292 peer_id_bytes[8..10].copy_from_slice(&port_bytes);
293
294 PeerId(peer_id_bytes)
295 }
296
297 pub async fn connect_to_peer(
299 &self,
300 peer_id: PeerId,
301 coordinator: SocketAddr,
302 ) -> Result<SocketAddr, NatTraversalError> {
303 info!(
304 "Initiating connection to peer {:?} via coordinator {}",
305 peer_id, coordinator
306 );
307
308 {
310 let mut stats = self.stats.lock().await;
311 stats.nat_traversal_attempts += 1;
312 }
313
314 self.nat_endpoint
316 .initiate_nat_traversal(peer_id, coordinator)?;
317
318 let start = Instant::now();
320 let timeout = self.config.connection_timeout;
321
322 while start.elapsed() < timeout {
323 let events = self.nat_endpoint.poll(Instant::now())?;
324
325 for event in events {
326 match event {
327 NatTraversalEvent::ConnectionEstablished {
328 peer_id: evt_peer,
329 remote_address,
330 } => {
331 if evt_peer == peer_id {
332 {
334 let mut peers = self.connected_peers.write().await;
335 peers.insert(peer_id, remote_address);
336 }
337
338 {
340 let mut stats = self.stats.lock().await;
341 stats.successful_connections += 1;
342 stats.active_connections += 1;
343 stats.nat_traversal_successes += 1;
344 }
345
346 info!(
347 "Successfully connected to peer {:?} at {}",
348 peer_id, remote_address
349 );
350
351 if self.config.auth_config.require_authentication {
353 match self.authenticate_as_initiator(&peer_id).await {
354 Ok(_) => {
355 info!("Authentication successful with peer {:?}", peer_id);
356 }
357 Err(e) => {
358 error!(
359 "Authentication failed with peer {:?}: {}",
360 peer_id, e
361 );
362 self.connected_peers.write().await.remove(&peer_id);
364 let mut stats = self.stats.lock().await;
366 stats.active_connections =
367 stats.active_connections.saturating_sub(1);
368 stats.failed_connections += 1;
369 return Err(NatTraversalError::ConfigError(format!(
370 "Authentication failed: {e}"
371 )));
372 }
373 }
374 }
375
376 return Ok(remote_address);
377 }
378 }
379 NatTraversalEvent::TraversalFailed {
380 peer_id: evt_peer,
381 error,
382 fallback_available: _,
383 } => {
384 if evt_peer == peer_id {
385 {
387 let mut stats = self.stats.lock().await;
388 stats.failed_connections += 1;
389 }
390
391 error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
392 return Err(error);
393 }
394 }
395 _ => {
396 debug!("Received event: {:?}", event);
397 }
398 }
399 }
400
401 tokio::time::sleep(Duration::from_millis(100)).await;
403 }
404
405 {
407 let mut stats = self.stats.lock().await;
408 stats.failed_connections += 1;
409 }
410
411 Err(NatTraversalError::Timeout)
412 }
413
414 pub async fn accept(
416 &self,
417 ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
418 info!("Waiting for incoming connection...");
419
420 match self.nat_endpoint.accept_connection().await {
422 Ok((peer_id, connection)) => {
423 let remote_addr = connection.remote_address();
424
425 {
427 let mut peers = self.connected_peers.write().await;
428 peers.insert(peer_id, remote_addr);
429 }
430
431 {
433 let mut stats = self.stats.lock().await;
434 stats.successful_connections += 1;
435 stats.active_connections += 1;
436 }
437
438 info!(
439 "Accepted connection from peer {:?} at {}",
440 peer_id, remote_addr
441 );
442
443 if self.config.auth_config.require_authentication {
445 let self_clone = self.clone();
447 let auth_peer_id = peer_id;
448 tokio::spawn(async move {
449 if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
450 error!(
451 "Failed to handle authentication for peer {:?}: {}",
452 auth_peer_id, e
453 );
454 self_clone
456 .connected_peers
457 .write()
458 .await
459 .remove(&auth_peer_id);
460 let mut stats = self_clone.stats.lock().await;
461 stats.active_connections = stats.active_connections.saturating_sub(1);
462 }
463 });
464 }
465
466 Ok((remote_addr, peer_id))
467 }
468 Err(e) => {
469 {
471 let mut stats = self.stats.lock().await;
472 stats.failed_connections += 1;
473 }
474
475 error!("Failed to accept connection: {}", e);
476 Err(Box::new(e))
477 }
478 }
479 }
480
481 pub async fn send_to_peer(
483 &self,
484 peer_id: &PeerId,
485 data: &[u8],
486 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
487 let peers = self.connected_peers.read().await;
488
489 if let Some(remote_addr) = peers.get(peer_id) {
490 debug!(
491 "Sending {} bytes to peer {:?} at {}",
492 data.len(),
493 peer_id,
494 remote_addr
495 );
496
497 match self.nat_endpoint.get_connection(peer_id) {
499 Ok(Some(connection)) => {
500 let mut send_stream = connection
502 .open_uni()
503 .await
504 .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
505
506 send_stream
508 .write_all(data)
509 .await
510 .map_err(|e| format!("Failed to write data: {e}"))?;
511
512 send_stream
514 .finish()
515 .map_err(|e| format!("Failed to finish stream: {e}"))?;
516
517 debug!(
518 "Successfully sent {} bytes to peer {:?}",
519 data.len(),
520 peer_id
521 );
522 Ok(())
523 }
524 Ok(None) => {
525 error!("No active connection found for peer {:?}", peer_id);
526 Err("No active connection".into())
527 }
528 Err(e) => {
529 error!("Failed to get connection for peer {:?}: {}", peer_id, e);
530 Err(Box::new(e))
531 }
532 }
533 } else {
534 error!("Peer {:?} not connected", peer_id);
535 Err("Peer not connected".into())
536 }
537 }
538
539 pub async fn receive(
541 &self,
542 ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
543 debug!("Waiting to receive data from any connected peer...");
544
545 let peers = {
547 let peers_guard = self.connected_peers.read().await;
548 peers_guard.clone()
549 };
550
551 if peers.is_empty() {
552 return Err("No connected peers".into());
553 }
554
555 for (peer_id, _remote_addr) in peers.iter() {
559 match self.nat_endpoint.get_connection(peer_id) {
560 Ok(Some(connection)) => {
561 match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
563 .await
564 {
565 Ok(Ok(mut recv_stream)) => {
566 debug!(
567 "Receiving data from unidirectional stream from peer {:?}",
568 peer_id
569 );
570
571 match recv_stream.read_to_end(1024 * 1024).await {
573 Ok(buffer) => {
575 if !buffer.is_empty() {
576 debug!(
577 "Received {} bytes from peer {:?}",
578 buffer.len(),
579 peer_id
580 );
581 return Ok((*peer_id, buffer));
582 }
583 }
584 Err(e) => {
585 debug!(
586 "Failed to read from stream for peer {:?}: {}",
587 peer_id, e
588 );
589 }
590 }
591 }
592 Ok(Err(e)) => {
593 debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
594 }
595 Err(_) => {
596 }
598 }
599
600 match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
602 .await
603 {
604 Ok(Ok((_send_stream, mut recv_stream))) => {
605 debug!(
606 "Receiving data from bidirectional stream from peer {:?}",
607 peer_id
608 );
609
610 match recv_stream.read_to_end(1024 * 1024).await {
612 Ok(buffer) => {
614 if !buffer.is_empty() {
615 debug!(
616 "Received {} bytes from peer {:?} via bidirectional stream",
617 buffer.len(),
618 peer_id
619 );
620 return Ok((*peer_id, buffer));
621 }
622 }
623 Err(e) => {
624 debug!(
625 "Failed to read from bidirectional stream for peer {:?}: {}",
626 peer_id, e
627 );
628 }
629 }
630 }
631 Ok(Err(e)) => {
632 debug!(
633 "Failed to accept bidirectional stream from peer {:?}: {}",
634 peer_id, e
635 );
636 }
637 Err(_) => {
638 }
640 }
641 }
642 Ok(None) => {
643 debug!("No active connection for peer {:?}", peer_id);
644 }
645 Err(e) => {
646 debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
647 }
648 }
649 }
650
651 Err("No data available from any connected peer".into())
653 }
654
655 pub async fn get_stats(&self) -> NodeStats {
657 self.stats.lock().await.clone()
658 }
659
660 pub fn get_nat_endpoint(
662 &self,
663 ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
664 Ok(&*self.nat_endpoint)
665 }
666
667 pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
669 let stats = Arc::clone(&self.stats);
670 let interval_duration = self.config.stats_interval;
671
672 tokio::spawn(async move {
673 let mut interval = tokio::time::interval(interval_duration);
674
675 loop {
676 interval.tick().await;
677
678 let stats_snapshot = stats.lock().await.clone();
679
680 info!(
681 "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
682 stats_snapshot.active_connections,
683 stats_snapshot.successful_connections,
684 stats_snapshot.nat_traversal_successes,
685 stats_snapshot.nat_traversal_attempts
686 );
687 }
688 })
689 }
690
691 pub async fn get_nat_stats(
693 &self,
694 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
695 self.nat_endpoint.get_nat_stats()
696 }
697
698 pub async fn get_connection_metrics(
702 &self,
703 peer_id: &PeerId,
704 ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
705 match self.nat_endpoint.get_connection(peer_id) {
706 Ok(Some(connection)) => {
707 let rtt = connection.rtt();
709
710 let stats = connection.stats();
712
713 Ok(ConnectionMetrics {
714 bytes_sent: stats.udp_tx.bytes,
715 bytes_received: stats.udp_rx.bytes,
716 rtt: Some(rtt),
717 packet_loss: stats.path.lost_packets as f64
718 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
719 })
720 }
721 Ok(None) => Err("Connection not found".into()),
722 Err(e) => Err(format!("Failed to get connection: {e}").into()),
723 }
724 }
725
726 pub fn peer_id(&self) -> PeerId {
728 self.peer_id
729 }
730
731 pub fn public_key_bytes(&self) -> [u8; 32] {
733 self.auth_manager.public_key_bytes()
734 }
735
736 async fn send_auth_message(
738 &self,
739 peer_id: &PeerId,
740 message: AuthMessage,
741 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
742 let data = AuthManager::serialize_message(&message)?;
743 self.send_to_peer(peer_id, &data).await
744 }
745
746 async fn authenticate_as_initiator(
748 &self,
749 peer_id: &PeerId,
750 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
751 info!("Starting authentication with peer {:?}", peer_id);
752
753 let auth_request = self.auth_manager.create_auth_request();
755 self.send_auth_message(peer_id, auth_request).await?;
756
757 let timeout_duration = self.config.auth_config.auth_timeout;
759 let start = Instant::now();
760
761 while start.elapsed() < timeout_duration {
762 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
763 Ok(Ok((recv_peer_id, data))) => {
764 if recv_peer_id == *peer_id {
765 match AuthManager::deserialize_message(&data) {
766 Ok(AuthMessage::Challenge { nonce, .. }) => {
767 let response =
769 self.auth_manager.create_challenge_response(nonce)?;
770 self.send_auth_message(peer_id, response).await?;
771 }
772 Ok(AuthMessage::AuthSuccess { .. }) => {
773 info!("Authentication successful with peer {:?}", peer_id);
774 return Ok(());
775 }
776 Ok(AuthMessage::AuthFailure { reason }) => {
777 return Err(format!("Authentication failed: {reason}").into());
778 }
779 _ => continue,
780 }
781 }
782 }
783 _ => continue,
784 }
785 }
786
787 Err("Authentication timeout".into())
788 }
789
790 async fn handle_auth_message(
792 &self,
793 peer_id: PeerId,
794 message: AuthMessage,
795 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
796 let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
797
798 match auth_protocol.handle_message(peer_id, message).await {
799 Ok(Some(response)) => {
800 self.send_auth_message(&peer_id, response).await?;
801 }
802 Ok(None) => {
803 }
805 Err(e) => {
806 error!("Authentication error: {}", e);
807 let failure = AuthMessage::AuthFailure {
808 reason: e.to_string(),
809 };
810 self.send_auth_message(&peer_id, failure).await?;
811 return Err(Box::new(e));
812 }
813 }
814
815 Ok(())
816 }
817
818 pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
820 self.auth_manager.is_authenticated(peer_id).await
821 }
822
823 pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
825 self.auth_manager.list_authenticated_peers().await
826 }
827
828 async fn handle_incoming_auth(
830 &self,
831 peer_id: PeerId,
832 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
833 info!("Handling incoming authentication from peer {:?}", peer_id);
834
835 let timeout_duration = self.config.auth_config.auth_timeout;
836 let start = Instant::now();
837
838 while start.elapsed() < timeout_duration {
839 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
840 Ok(Ok((recv_peer_id, data))) => {
841 if recv_peer_id == peer_id {
842 match AuthManager::deserialize_message(&data) {
843 Ok(auth_msg) => {
844 self.handle_auth_message(peer_id, auth_msg).await?;
845
846 if self.auth_manager.is_authenticated(&peer_id).await {
848 info!("Peer {:?} successfully authenticated", peer_id);
849 return Ok(());
850 }
851 }
852 Err(_) => {
853 continue;
855 }
856 }
857 }
858 }
859 _ => continue,
860 }
861 }
862
863 Err("Authentication timeout waiting for peer".into())
864 }
865
866 pub fn get_metrics_collector(
868 &self,
869 ) -> Result<Arc<crate::logging::MetricsCollector>, &'static str> {
870 Ok(Arc::new(crate::logging::MetricsCollector::new()))
874 }
875}