1use std::{
7 collections::HashMap,
8 net::SocketAddr,
9 sync::Arc,
10 time::{Duration, Instant},
11};
12
13use tracing::{debug, error, info};
14
15use crate::{
16 auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
17 crypto::raw_public_keys::key_utils::{
18 derive_peer_id_from_public_key, generate_ed25519_keypair,
19 },
20 nat_traversal_api::{
21 EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
22 NatTraversalEvent, NatTraversalStatistics, PeerId,
23 },
24};
25
26#[derive(Clone, Debug)]
28pub struct QuicP2PNode {
29 nat_endpoint: Arc<NatTraversalEndpoint>,
31 connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
33 stats: Arc<tokio::sync::Mutex<NodeStats>>,
35 config: QuicNodeConfig,
37 auth_manager: Arc<AuthManager>,
39 peer_id: PeerId,
41}
42
43#[derive(Debug, Clone)]
45pub struct QuicNodeConfig {
46 pub role: EndpointRole,
48 pub bootstrap_nodes: Vec<SocketAddr>,
50 pub enable_coordinator: bool,
52 pub max_connections: usize,
54 pub connection_timeout: Duration,
56 pub stats_interval: Duration,
58 pub auth_config: AuthConfig,
60 pub bind_addr: Option<SocketAddr>,
62}
63
64impl Default for QuicNodeConfig {
65 fn default() -> Self {
66 Self {
67 role: EndpointRole::Client,
68 bootstrap_nodes: Vec::new(),
69 enable_coordinator: false,
70 max_connections: 100,
71 connection_timeout: Duration::from_secs(30),
72 stats_interval: Duration::from_secs(30),
73 auth_config: AuthConfig::default(),
74 bind_addr: None,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
80pub struct ConnectionMetrics {
81 pub bytes_sent: u64,
83 pub bytes_received: u64,
85 pub rtt: Option<Duration>,
87 pub packet_loss: f64,
89}
90
91#[derive(Debug, Clone)]
92pub struct NodeStats {
93 pub active_connections: usize,
95 pub successful_connections: u64,
97 pub failed_connections: u64,
99 pub nat_traversal_attempts: u64,
101 pub nat_traversal_successes: u64,
103 pub start_time: Instant,
105}
106
107impl Default for NodeStats {
108 fn default() -> Self {
109 Self {
110 active_connections: 0,
111 successful_connections: 0,
112 failed_connections: 0,
113 nat_traversal_attempts: 0,
114 nat_traversal_successes: 0,
115 start_time: Instant::now(),
116 }
117 }
118}
119
120impl QuicP2PNode {
121 pub async fn new(
123 config: QuicNodeConfig,
124 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
125 let (secret_key, public_key) = generate_ed25519_keypair();
127 let peer_id = derive_peer_id_from_public_key(&public_key);
128
129 info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
130
131 let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
133
134 let nat_config = NatTraversalConfig {
136 role: config.role,
137 bootstrap_nodes: config.bootstrap_nodes.clone(),
138 max_candidates: 50,
139 coordination_timeout: Duration::from_secs(10),
140 enable_symmetric_nat: true,
141 enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
143 max_concurrent_attempts: 5,
144 bind_addr: config.bind_addr,
145 prefer_rfc_nat_traversal: true, timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
147 };
148
149 let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
151 start_time: Instant::now(),
152 ..Default::default()
153 }));
154 let stats_for_callback = Arc::clone(&stats_clone);
155
156 let event_callback = Box::new(move |event: NatTraversalEvent| {
157 let stats = stats_for_callback.clone();
158 tokio::spawn(async move {
159 let mut stats = stats.lock().await;
160 match event {
161 NatTraversalEvent::CoordinationRequested { .. } => {
162 stats.nat_traversal_attempts += 1;
163 }
164 NatTraversalEvent::ConnectionEstablished { .. } => {
165 stats.nat_traversal_successes += 1;
166 }
167 _ => {}
168 }
169 });
170 });
171
172 let nat_endpoint =
174 Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
175
176 Ok(Self {
177 nat_endpoint,
178 connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
179 stats: stats_clone,
180 config,
181 auth_manager,
182 peer_id,
183 })
184 }
185
186 pub fn get_config(&self) -> &QuicNodeConfig {
188 &self.config
189 }
190
191 pub async fn connect_to_bootstrap(
193 &self,
194 bootstrap_addr: SocketAddr,
195 ) -> Result<PeerId, NatTraversalError> {
196 info!("Connecting to bootstrap node at {}", bootstrap_addr);
197
198 let endpoint = self.nat_endpoint.get_quinn_endpoint().ok_or_else(|| {
200 NatTraversalError::ConfigError("Quinn endpoint not available".to_string())
201 })?;
202
203 match endpoint.connect(bootstrap_addr, "bootstrap-node") {
205 Ok(connecting) => {
206 match connecting.await {
207 Ok(_connection) => {
208 let peer_id = self.derive_peer_id_from_address(bootstrap_addr);
212
213 self.connected_peers
215 .write()
216 .await
217 .insert(peer_id, bootstrap_addr);
218
219 {
221 let mut stats = self.stats.lock().await;
222 stats.active_connections += 1;
223 stats.successful_connections += 1;
224 }
225
226 if let Some(ref callback) = self.nat_endpoint.get_event_callback() {
228 callback(NatTraversalEvent::ConnectionEstablished {
229 peer_id,
230 remote_address: bootstrap_addr,
231 });
232 }
233
234 info!(
235 "Successfully connected to bootstrap node {} with peer ID {:?}",
236 bootstrap_addr, peer_id
237 );
238 Ok(peer_id)
239 }
240 Err(e) => {
241 error!(
242 "Failed to establish connection to bootstrap node {}: {}",
243 bootstrap_addr, e
244 );
245 {
246 let mut stats = self.stats.lock().await;
247 stats.failed_connections += 1;
248 }
249 Err(NatTraversalError::NetworkError(format!(
250 "Connection failed: {e}"
251 )))
252 }
253 }
254 }
255 Err(e) => {
256 error!(
257 "Failed to initiate connection to bootstrap node {}: {}",
258 bootstrap_addr, e
259 );
260 {
261 let mut stats = self.stats.lock().await;
262 stats.failed_connections += 1;
263 }
264 Err(NatTraversalError::NetworkError(format!(
265 "Connect error: {e}"
266 )))
267 }
268 }
269 }
270
271 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
273 use std::collections::hash_map::DefaultHasher;
274 use std::hash::{Hash, Hasher};
275
276 let mut hasher = DefaultHasher::new();
277 addr.hash(&mut hasher);
278 let hash = hasher.finish();
279
280 let mut peer_id_bytes = [0u8; 32];
281 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
282 let port_bytes = addr.port().to_le_bytes();
283 peer_id_bytes[8..10].copy_from_slice(&port_bytes);
284
285 PeerId(peer_id_bytes)
286 }
287
288 pub async fn connect_to_peer(
290 &self,
291 peer_id: PeerId,
292 coordinator: SocketAddr,
293 ) -> Result<SocketAddr, NatTraversalError> {
294 info!(
295 "Initiating connection to peer {:?} via coordinator {}",
296 peer_id, coordinator
297 );
298
299 {
301 let mut stats = self.stats.lock().await;
302 stats.nat_traversal_attempts += 1;
303 }
304
305 self.nat_endpoint
307 .initiate_nat_traversal(peer_id, coordinator)?;
308
309 let start = Instant::now();
311 let timeout = self.config.connection_timeout;
312
313 while start.elapsed() < timeout {
314 let events = self.nat_endpoint.poll(Instant::now())?;
315
316 for event in events {
317 match event {
318 NatTraversalEvent::ConnectionEstablished {
319 peer_id: evt_peer,
320 remote_address,
321 } => {
322 if evt_peer == peer_id {
323 {
325 let mut peers = self.connected_peers.write().await;
326 peers.insert(peer_id, remote_address);
327 }
328
329 {
331 let mut stats = self.stats.lock().await;
332 stats.successful_connections += 1;
333 stats.active_connections += 1;
334 stats.nat_traversal_successes += 1;
335 }
336
337 info!(
338 "Successfully connected to peer {:?} at {}",
339 peer_id, remote_address
340 );
341
342 if self.config.auth_config.require_authentication {
344 match self.authenticate_as_initiator(&peer_id).await {
345 Ok(_) => {
346 info!("Authentication successful with peer {:?}", peer_id);
347 }
348 Err(e) => {
349 error!(
350 "Authentication failed with peer {:?}: {}",
351 peer_id, e
352 );
353 self.connected_peers.write().await.remove(&peer_id);
355 let mut stats = self.stats.lock().await;
357 stats.active_connections =
358 stats.active_connections.saturating_sub(1);
359 stats.failed_connections += 1;
360 return Err(NatTraversalError::ConfigError(format!(
361 "Authentication failed: {e}"
362 )));
363 }
364 }
365 }
366
367 return Ok(remote_address);
368 }
369 }
370 NatTraversalEvent::TraversalFailed {
371 peer_id: evt_peer,
372 error,
373 fallback_available: _,
374 } => {
375 if evt_peer == peer_id {
376 {
378 let mut stats = self.stats.lock().await;
379 stats.failed_connections += 1;
380 }
381
382 error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
383 return Err(error);
384 }
385 }
386 _ => {
387 debug!("Received event: {:?}", event);
388 }
389 }
390 }
391
392 tokio::time::sleep(Duration::from_millis(100)).await;
394 }
395
396 {
398 let mut stats = self.stats.lock().await;
399 stats.failed_connections += 1;
400 }
401
402 Err(NatTraversalError::Timeout)
403 }
404
405 pub async fn accept(
407 &self,
408 ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
409 info!("Waiting for incoming connection...");
410
411 match self.nat_endpoint.accept_connection().await {
413 Ok((peer_id, connection)) => {
414 let remote_addr = connection.remote_address();
415
416 {
418 let mut peers = self.connected_peers.write().await;
419 peers.insert(peer_id, remote_addr);
420 }
421
422 {
424 let mut stats = self.stats.lock().await;
425 stats.successful_connections += 1;
426 stats.active_connections += 1;
427 }
428
429 info!(
430 "Accepted connection from peer {:?} at {}",
431 peer_id, remote_addr
432 );
433
434 if self.config.auth_config.require_authentication {
436 let self_clone = self.clone();
438 let auth_peer_id = peer_id;
439 tokio::spawn(async move {
440 if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
441 error!(
442 "Failed to handle authentication for peer {:?}: {}",
443 auth_peer_id, e
444 );
445 self_clone
447 .connected_peers
448 .write()
449 .await
450 .remove(&auth_peer_id);
451 let mut stats = self_clone.stats.lock().await;
452 stats.active_connections = stats.active_connections.saturating_sub(1);
453 }
454 });
455 }
456
457 Ok((remote_addr, peer_id))
458 }
459 Err(e) => {
460 {
462 let mut stats = self.stats.lock().await;
463 stats.failed_connections += 1;
464 }
465
466 error!("Failed to accept connection: {}", e);
467 Err(Box::new(e))
468 }
469 }
470 }
471
472 pub async fn send_to_peer(
474 &self,
475 peer_id: &PeerId,
476 data: &[u8],
477 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
478 let peers = self.connected_peers.read().await;
479
480 if let Some(remote_addr) = peers.get(peer_id) {
481 debug!(
482 "Sending {} bytes to peer {:?} at {}",
483 data.len(),
484 peer_id,
485 remote_addr
486 );
487
488 match self.nat_endpoint.get_connection(peer_id) {
490 Ok(Some(connection)) => {
491 let mut send_stream = connection
493 .open_uni()
494 .await
495 .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
496
497 send_stream
499 .write_all(data)
500 .await
501 .map_err(|e| format!("Failed to write data: {e}"))?;
502
503 send_stream
505 .finish()
506 .map_err(|e| format!("Failed to finish stream: {e}"))?;
507
508 debug!(
509 "Successfully sent {} bytes to peer {:?}",
510 data.len(),
511 peer_id
512 );
513 Ok(())
514 }
515 Ok(None) => {
516 error!("No active connection found for peer {:?}", peer_id);
517 Err("No active connection".into())
518 }
519 Err(e) => {
520 error!("Failed to get connection for peer {:?}: {}", peer_id, e);
521 Err(Box::new(e))
522 }
523 }
524 } else {
525 error!("Peer {:?} not connected", peer_id);
526 Err("Peer not connected".into())
527 }
528 }
529
530 pub async fn receive(
532 &self,
533 ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
534 debug!("Waiting to receive data from any connected peer...");
535
536 let peers = {
538 let peers_guard = self.connected_peers.read().await;
539 peers_guard.clone()
540 };
541
542 if peers.is_empty() {
543 return Err("No connected peers".into());
544 }
545
546 for (peer_id, _remote_addr) in peers.iter() {
550 match self.nat_endpoint.get_connection(peer_id) {
551 Ok(Some(connection)) => {
552 match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
554 .await
555 {
556 Ok(Ok(mut recv_stream)) => {
557 debug!(
558 "Receiving data from unidirectional stream from peer {:?}",
559 peer_id
560 );
561
562 match recv_stream.read_to_end(1024 * 1024).await {
564 Ok(buffer) => {
566 if !buffer.is_empty() {
567 debug!(
568 "Received {} bytes from peer {:?}",
569 buffer.len(),
570 peer_id
571 );
572 return Ok((*peer_id, buffer));
573 }
574 }
575 Err(e) => {
576 debug!(
577 "Failed to read from stream for peer {:?}: {}",
578 peer_id, e
579 );
580 }
581 }
582 }
583 Ok(Err(e)) => {
584 debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
585 }
586 Err(_) => {
587 }
589 }
590
591 match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
593 .await
594 {
595 Ok(Ok((_send_stream, mut recv_stream))) => {
596 debug!(
597 "Receiving data from bidirectional stream from peer {:?}",
598 peer_id
599 );
600
601 match recv_stream.read_to_end(1024 * 1024).await {
603 Ok(buffer) => {
605 if !buffer.is_empty() {
606 debug!(
607 "Received {} bytes from peer {:?} via bidirectional stream",
608 buffer.len(),
609 peer_id
610 );
611 return Ok((*peer_id, buffer));
612 }
613 }
614 Err(e) => {
615 debug!(
616 "Failed to read from bidirectional stream for peer {:?}: {}",
617 peer_id, e
618 );
619 }
620 }
621 }
622 Ok(Err(e)) => {
623 debug!(
624 "Failed to accept bidirectional stream from peer {:?}: {}",
625 peer_id, e
626 );
627 }
628 Err(_) => {
629 }
631 }
632 }
633 Ok(None) => {
634 debug!("No active connection for peer {:?}", peer_id);
635 }
636 Err(e) => {
637 debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
638 }
639 }
640 }
641
642 Err("No data available from any connected peer".into())
644 }
645
646 pub async fn get_stats(&self) -> NodeStats {
648 self.stats.lock().await.clone()
649 }
650
651 pub fn get_nat_endpoint(
653 &self,
654 ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
655 Ok(&*self.nat_endpoint)
656 }
657
658 pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
660 let stats = Arc::clone(&self.stats);
661 let interval_duration = self.config.stats_interval;
662
663 tokio::spawn(async move {
664 let mut interval = tokio::time::interval(interval_duration);
665
666 loop {
667 interval.tick().await;
668
669 let stats_snapshot = stats.lock().await.clone();
670
671 info!(
672 "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
673 stats_snapshot.active_connections,
674 stats_snapshot.successful_connections,
675 stats_snapshot.nat_traversal_successes,
676 stats_snapshot.nat_traversal_attempts
677 );
678 }
679 })
680 }
681
682 pub async fn get_nat_stats(
684 &self,
685 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
686 self.nat_endpoint.get_nat_stats()
687 }
688
689 pub fn inject_observed_address(
691 &self,
692 observed_address: SocketAddr,
693 from_peer: PeerId,
694 ) -> Result<(), NatTraversalError> {
695 self.nat_endpoint
696 .inject_observed_address(observed_address, from_peer)
697 }
698
699 pub async fn get_connection_metrics(
701 &self,
702 peer_id: &PeerId,
703 ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
704 match self.nat_endpoint.get_connection(peer_id) {
705 Ok(Some(connection)) => {
706 let rtt = connection.rtt();
708
709 let stats = connection.stats();
711
712 Ok(ConnectionMetrics {
713 bytes_sent: stats.udp_tx.bytes,
714 bytes_received: stats.udp_rx.bytes,
715 rtt: Some(rtt),
716 packet_loss: stats.path.lost_packets as f64
717 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
718 })
719 }
720 Ok(None) => Err("Connection not found".into()),
721 Err(e) => Err(format!("Failed to get connection: {e}").into()),
722 }
723 }
724
725 pub fn peer_id(&self) -> PeerId {
727 self.peer_id
728 }
729
730 pub fn public_key_bytes(&self) -> [u8; 32] {
732 self.auth_manager.public_key_bytes()
733 }
734
735 async fn send_auth_message(
737 &self,
738 peer_id: &PeerId,
739 message: AuthMessage,
740 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
741 let data = AuthManager::serialize_message(&message)?;
742 self.send_to_peer(peer_id, &data).await
743 }
744
745 async fn authenticate_as_initiator(
747 &self,
748 peer_id: &PeerId,
749 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
750 info!("Starting authentication with peer {:?}", peer_id);
751
752 let auth_request = self.auth_manager.create_auth_request();
754 self.send_auth_message(peer_id, auth_request).await?;
755
756 let timeout_duration = self.config.auth_config.auth_timeout;
758 let start = Instant::now();
759
760 while start.elapsed() < timeout_duration {
761 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
762 Ok(Ok((recv_peer_id, data))) => {
763 if recv_peer_id == *peer_id {
764 match AuthManager::deserialize_message(&data) {
765 Ok(AuthMessage::Challenge { nonce, .. }) => {
766 let response =
768 self.auth_manager.create_challenge_response(nonce)?;
769 self.send_auth_message(peer_id, response).await?;
770 }
771 Ok(AuthMessage::AuthSuccess { .. }) => {
772 info!("Authentication successful with peer {:?}", peer_id);
773 return Ok(());
774 }
775 Ok(AuthMessage::AuthFailure { reason }) => {
776 return Err(format!("Authentication failed: {reason}").into());
777 }
778 _ => continue,
779 }
780 }
781 }
782 _ => continue,
783 }
784 }
785
786 Err("Authentication timeout".into())
787 }
788
789 async fn handle_auth_message(
791 &self,
792 peer_id: PeerId,
793 message: AuthMessage,
794 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
795 let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
796
797 match auth_protocol.handle_message(peer_id, message).await {
798 Ok(Some(response)) => {
799 self.send_auth_message(&peer_id, response).await?;
800 }
801 Ok(None) => {
802 }
804 Err(e) => {
805 error!("Authentication error: {}", e);
806 let failure = AuthMessage::AuthFailure {
807 reason: e.to_string(),
808 };
809 self.send_auth_message(&peer_id, failure).await?;
810 return Err(Box::new(e));
811 }
812 }
813
814 Ok(())
815 }
816
817 pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
819 self.auth_manager.is_authenticated(peer_id).await
820 }
821
822 pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
824 self.auth_manager.list_authenticated_peers().await
825 }
826
827 async fn handle_incoming_auth(
829 &self,
830 peer_id: PeerId,
831 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
832 info!("Handling incoming authentication from peer {:?}", peer_id);
833
834 let timeout_duration = self.config.auth_config.auth_timeout;
835 let start = Instant::now();
836
837 while start.elapsed() < timeout_duration {
838 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
839 Ok(Ok((recv_peer_id, data))) => {
840 if recv_peer_id == peer_id {
841 match AuthManager::deserialize_message(&data) {
842 Ok(auth_msg) => {
843 self.handle_auth_message(peer_id, auth_msg).await?;
844
845 if self.auth_manager.is_authenticated(&peer_id).await {
847 info!("Peer {:?} successfully authenticated", peer_id);
848 return Ok(());
849 }
850 }
851 Err(_) => {
852 continue;
854 }
855 }
856 }
857 }
858 _ => continue,
859 }
860 }
861
862 Err("Authentication timeout waiting for peer".into())
863 }
864
865 pub fn get_metrics_collector(&self) -> Result<Arc<crate::logging::MetricsCollector>, &'static str> {
867 Ok(Arc::new(crate::logging::MetricsCollector::new()))
871 }
872}