1use std::{
14 collections::HashMap,
15 net::SocketAddr,
16 sync::Arc,
17 time::{Duration, Instant},
18};
19
20use tracing::{debug, error, info};
21
22use crate::{
23 auth::{AuthConfig, AuthManager, AuthMessage, AuthProtocol},
24 crypto::raw_public_keys::key_utils::{
25 derive_peer_id_from_public_key, generate_ed25519_keypair,
26 },
27 nat_traversal_api::{
28 EndpointRole, NatTraversalConfig, NatTraversalEndpoint, NatTraversalError,
29 NatTraversalEvent, NatTraversalStatistics, PeerId,
30 },
31};
32
33#[derive(Clone, Debug)]
35pub struct QuicP2PNode {
36 nat_endpoint: Arc<NatTraversalEndpoint>,
38 connected_peers: Arc<tokio::sync::RwLock<HashMap<PeerId, SocketAddr>>>,
40 stats: Arc<tokio::sync::Mutex<NodeStats>>,
42 config: QuicNodeConfig,
44 auth_manager: Arc<AuthManager>,
46 peer_id: PeerId,
48}
49
50#[derive(Debug, Clone)]
52pub struct QuicNodeConfig {
53 pub role: EndpointRole,
55 pub bootstrap_nodes: Vec<SocketAddr>,
57 pub enable_coordinator: bool,
59 pub max_connections: usize,
61 pub connection_timeout: Duration,
63 pub stats_interval: Duration,
65 pub auth_config: AuthConfig,
67 pub bind_addr: Option<SocketAddr>,
69}
70
71impl Default for QuicNodeConfig {
72 fn default() -> Self {
73 Self {
74 role: EndpointRole::Client,
75 bootstrap_nodes: Vec::new(),
76 enable_coordinator: false,
77 max_connections: 100,
78 connection_timeout: Duration::from_secs(30),
79 stats_interval: Duration::from_secs(30),
80 auth_config: AuthConfig::default(),
81 bind_addr: None,
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct ConnectionMetrics {
89 pub bytes_sent: u64,
91 pub bytes_received: u64,
93 pub rtt: Option<Duration>,
95 pub packet_loss: f64,
97}
98
99#[derive(Debug, Clone)]
101pub struct NodeStats {
102 pub active_connections: usize,
104 pub successful_connections: u64,
106 pub failed_connections: u64,
108 pub nat_traversal_attempts: u64,
110 pub nat_traversal_successes: u64,
112 pub start_time: Instant,
114}
115
116impl Default for NodeStats {
117 fn default() -> Self {
118 Self {
119 active_connections: 0,
120 successful_connections: 0,
121 failed_connections: 0,
122 nat_traversal_attempts: 0,
123 nat_traversal_successes: 0,
124 start_time: Instant::now(),
125 }
126 }
127}
128
129impl QuicP2PNode {
130 pub async fn new(
132 config: QuicNodeConfig,
133 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
134 let (secret_key, public_key) = generate_ed25519_keypair();
136 let peer_id = derive_peer_id_from_public_key(&public_key);
137
138 info!("Creating QUIC P2P node with peer ID: {:?}", peer_id);
139
140 let auth_manager = Arc::new(AuthManager::new(secret_key, config.auth_config.clone()));
142
143 let nat_config = NatTraversalConfig {
145 role: config.role,
146 bootstrap_nodes: config.bootstrap_nodes.clone(),
147 max_candidates: 50,
148 coordination_timeout: Duration::from_secs(10),
149 enable_symmetric_nat: true,
150 enable_relay_fallback: !matches!(config.role, EndpointRole::Bootstrap),
152 max_concurrent_attempts: 5,
153 bind_addr: config.bind_addr,
154 prefer_rfc_nat_traversal: true, timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
156 };
157
158 let stats_clone = Arc::new(tokio::sync::Mutex::new(NodeStats {
160 start_time: Instant::now(),
161 ..Default::default()
162 }));
163 let stats_for_callback = Arc::clone(&stats_clone);
164
165 let event_callback = Box::new(move |event: NatTraversalEvent| {
166 let stats = stats_for_callback.clone();
167 tokio::spawn(async move {
168 let mut stats = stats.lock().await;
169 match event {
170 NatTraversalEvent::CoordinationRequested { .. } => {
171 stats.nat_traversal_attempts += 1;
172 }
173 NatTraversalEvent::ConnectionEstablished { .. } => {
174 stats.nat_traversal_successes += 1;
175 }
176 _ => {}
177 }
178 });
179 });
180
181 let nat_endpoint =
183 Arc::new(NatTraversalEndpoint::new(nat_config, Some(event_callback)).await?);
184
185 Ok(Self {
186 nat_endpoint,
187 connected_peers: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
188 stats: stats_clone,
189 config,
190 auth_manager,
191 peer_id,
192 })
193 }
194
195 pub fn get_config(&self) -> &QuicNodeConfig {
197 &self.config
198 }
199
200 pub async fn connect_to_bootstrap(
202 &self,
203 bootstrap_addr: SocketAddr,
204 ) -> Result<PeerId, NatTraversalError> {
205 info!("Connecting to bootstrap node at {}", bootstrap_addr);
206
207 match self
208 .nat_endpoint
209 .connect_to_address(bootstrap_addr, "bootstrap-node", None)
210 .await
211 {
212 Ok((peer_id, _connection)) => {
213 self.connected_peers
214 .write()
215 .await
216 .insert(peer_id, bootstrap_addr);
217
218 {
219 let mut stats = self.stats.lock().await;
220 stats.active_connections += 1;
221 stats.successful_connections += 1;
222 }
223
224 info!(
225 "Successfully connected to bootstrap node {} with peer ID {:?}",
226 bootstrap_addr, peer_id
227 );
228 Ok(peer_id)
229 }
230 Err(e) => {
231 error!(
232 "Failed to connect to bootstrap node {}: {}",
233 bootstrap_addr, e
234 );
235 {
236 let mut stats = self.stats.lock().await;
237 stats.failed_connections += 1;
238 }
239 Err(e)
240 }
241 }
242 }
243
244 #[allow(dead_code)]
246 fn derive_peer_id_from_address(&self, addr: SocketAddr) -> PeerId {
247 use std::collections::hash_map::DefaultHasher;
248 use std::hash::{Hash, Hasher};
249
250 let mut hasher = DefaultHasher::new();
251 addr.hash(&mut hasher);
252 let hash = hasher.finish();
253
254 let mut peer_id_bytes = [0u8; 32];
255 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
256 let port_bytes = addr.port().to_le_bytes();
257 peer_id_bytes[8..10].copy_from_slice(&port_bytes);
258
259 PeerId(peer_id_bytes)
260 }
261
262 pub async fn connect_to_peer(
264 &self,
265 peer_id: PeerId,
266 coordinator: SocketAddr,
267 ) -> Result<SocketAddr, NatTraversalError> {
268 info!(
269 "Initiating connection to peer {:?} via coordinator {}",
270 peer_id, coordinator
271 );
272
273 {
275 let mut stats = self.stats.lock().await;
276 stats.nat_traversal_attempts += 1;
277 }
278
279 self.nat_endpoint
281 .initiate_nat_traversal(peer_id, coordinator)?;
282
283 let start = Instant::now();
285 let timeout = self.config.connection_timeout;
286
287 while start.elapsed() < timeout {
288 let events = self.nat_endpoint.poll(Instant::now())?;
289
290 for event in events {
291 match event {
292 NatTraversalEvent::ConnectionEstablished {
293 peer_id: evt_peer,
294 remote_address,
295 } => {
296 if evt_peer == peer_id {
297 {
299 let mut peers = self.connected_peers.write().await;
300 peers.insert(peer_id, remote_address);
301 }
302
303 {
305 let mut stats = self.stats.lock().await;
306 stats.successful_connections += 1;
307 stats.active_connections += 1;
308 stats.nat_traversal_successes += 1;
309 }
310
311 info!(
312 "Successfully connected to peer {:?} at {}",
313 peer_id, remote_address
314 );
315
316 if self.config.auth_config.require_authentication {
318 match self.authenticate_as_initiator(&peer_id).await {
319 Ok(_) => {
320 info!("Authentication successful with peer {:?}", peer_id);
321 }
322 Err(e) => {
323 error!(
324 "Authentication failed with peer {:?}: {}",
325 peer_id, e
326 );
327 self.connected_peers.write().await.remove(&peer_id);
329 let mut stats = self.stats.lock().await;
331 stats.active_connections =
332 stats.active_connections.saturating_sub(1);
333 stats.failed_connections += 1;
334 return Err(NatTraversalError::ConfigError(format!(
335 "Authentication failed: {e}"
336 )));
337 }
338 }
339 }
340
341 return Ok(remote_address);
342 }
343 }
344 NatTraversalEvent::TraversalFailed {
345 peer_id: evt_peer,
346 error,
347 fallback_available: _,
348 } => {
349 if evt_peer == peer_id {
350 {
352 let mut stats = self.stats.lock().await;
353 stats.failed_connections += 1;
354 }
355
356 error!("NAT traversal failed for peer {:?}: {}", peer_id, error);
357 return Err(error);
358 }
359 }
360 _ => {
361 debug!("Received event: {:?}", event);
362 }
363 }
364 }
365
366 tokio::time::sleep(Duration::from_millis(100)).await;
368 }
369
370 {
372 let mut stats = self.stats.lock().await;
373 stats.failed_connections += 1;
374 }
375
376 Err(NatTraversalError::Timeout)
377 }
378
379 pub async fn accept(
381 &self,
382 ) -> Result<(SocketAddr, PeerId), Box<dyn std::error::Error + Send + Sync>> {
383 info!("Waiting for incoming connection...");
384
385 match self.nat_endpoint.accept_connection().await {
387 Ok((peer_id, connection)) => {
388 let remote_addr = connection.remote_address();
389
390 {
392 let mut peers = self.connected_peers.write().await;
393 peers.insert(peer_id, remote_addr);
394 }
395
396 {
398 let mut stats = self.stats.lock().await;
399 stats.successful_connections += 1;
400 stats.active_connections += 1;
401 }
402
403 info!(
404 "Accepted connection from peer {:?} at {}",
405 peer_id, remote_addr
406 );
407
408 if self.config.auth_config.require_authentication {
410 let self_clone = self.clone();
412 let auth_peer_id = peer_id;
413 tokio::spawn(async move {
414 if let Err(e) = self_clone.handle_incoming_auth(auth_peer_id).await {
415 error!(
416 "Failed to handle authentication for peer {:?}: {}",
417 auth_peer_id, e
418 );
419 self_clone
421 .connected_peers
422 .write()
423 .await
424 .remove(&auth_peer_id);
425 let mut stats = self_clone.stats.lock().await;
426 stats.active_connections = stats.active_connections.saturating_sub(1);
427 }
428 });
429 }
430
431 Ok((remote_addr, peer_id))
432 }
433 Err(e) => {
434 {
436 let mut stats = self.stats.lock().await;
437 stats.failed_connections += 1;
438 }
439
440 error!("Failed to accept connection: {}", e);
441 Err(Box::new(e))
442 }
443 }
444 }
445
446 pub async fn send_to_peer(
448 &self,
449 peer_id: &PeerId,
450 data: &[u8],
451 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
452 let peers = self.connected_peers.read().await;
453
454 if let Some(remote_addr) = peers.get(peer_id) {
455 debug!(
456 "Sending {} bytes to peer {:?} at {}",
457 data.len(),
458 peer_id,
459 remote_addr
460 );
461
462 match self.nat_endpoint.get_connection(peer_id) {
464 Ok(Some(connection)) => {
465 let mut send_stream = connection
467 .open_uni()
468 .await
469 .map_err(|e| format!("Failed to open unidirectional stream: {e}"))?;
470
471 send_stream
473 .write_all(data)
474 .await
475 .map_err(|e| format!("Failed to write data: {e}"))?;
476
477 send_stream
479 .finish()
480 .map_err(|e| format!("Failed to finish stream: {e}"))?;
481
482 debug!(
483 "Successfully sent {} bytes to peer {:?}",
484 data.len(),
485 peer_id
486 );
487 Ok(())
488 }
489 Ok(None) => {
490 error!("No active connection found for peer {:?}", peer_id);
491 Err("No active connection".into())
492 }
493 Err(e) => {
494 error!("Failed to get connection for peer {:?}: {}", peer_id, e);
495 Err(Box::new(e))
496 }
497 }
498 } else {
499 error!("Peer {:?} not connected", peer_id);
500 Err("Peer not connected".into())
501 }
502 }
503
504 pub async fn receive(
506 &self,
507 ) -> Result<(PeerId, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
508 debug!("Waiting to receive data from any connected peer...");
509
510 let peers = {
512 let peers_guard = self.connected_peers.read().await;
513 peers_guard.clone()
514 };
515
516 if peers.is_empty() {
517 return Err("No connected peers".into());
518 }
519
520 for (peer_id, _remote_addr) in peers.iter() {
524 match self.nat_endpoint.get_connection(peer_id) {
525 Ok(Some(connection)) => {
526 match tokio::time::timeout(Duration::from_millis(100), connection.accept_uni())
528 .await
529 {
530 Ok(Ok(mut recv_stream)) => {
531 debug!(
532 "Receiving data from unidirectional stream from peer {:?}",
533 peer_id
534 );
535
536 match recv_stream.read_to_end(1024 * 1024).await {
538 Ok(buffer) => {
540 if !buffer.is_empty() {
541 debug!(
542 "Received {} bytes from peer {:?}",
543 buffer.len(),
544 peer_id
545 );
546 return Ok((*peer_id, buffer));
547 }
548 }
549 Err(e) => {
550 debug!(
551 "Failed to read from stream for peer {:?}: {}",
552 peer_id, e
553 );
554 }
555 }
556 }
557 Ok(Err(e)) => {
558 debug!("Failed to accept uni stream from peer {:?}: {}", peer_id, e);
559 }
560 Err(_) => {
561 }
563 }
564
565 match tokio::time::timeout(Duration::from_millis(100), connection.accept_bi())
567 .await
568 {
569 Ok(Ok((_send_stream, mut recv_stream))) => {
570 debug!(
571 "Receiving data from bidirectional stream from peer {:?}",
572 peer_id
573 );
574
575 match recv_stream.read_to_end(1024 * 1024).await {
577 Ok(buffer) => {
579 if !buffer.is_empty() {
580 debug!(
581 "Received {} bytes from peer {:?} via bidirectional stream",
582 buffer.len(),
583 peer_id
584 );
585 return Ok((*peer_id, buffer));
586 }
587 }
588 Err(e) => {
589 debug!(
590 "Failed to read from bidirectional stream for peer {:?}: {}",
591 peer_id, e
592 );
593 }
594 }
595 }
596 Ok(Err(e)) => {
597 debug!(
598 "Failed to accept bidirectional stream from peer {:?}: {}",
599 peer_id, e
600 );
601 }
602 Err(_) => {
603 }
605 }
606 }
607 Ok(None) => {
608 debug!("No active connection for peer {:?}", peer_id);
609 }
610 Err(e) => {
611 debug!("Failed to get connection for peer {:?}: {}", peer_id, e);
612 }
613 }
614 }
615
616 Err("No data available from any connected peer".into())
618 }
619
620 pub async fn get_stats(&self) -> NodeStats {
622 self.stats.lock().await.clone()
623 }
624
625 pub fn get_nat_endpoint(
627 &self,
628 ) -> Result<&NatTraversalEndpoint, Box<dyn std::error::Error + Send + Sync>> {
629 Ok(&*self.nat_endpoint)
630 }
631
632 pub fn start_stats_task(&self) -> tokio::task::JoinHandle<()> {
634 let stats = Arc::clone(&self.stats);
635 let interval_duration = self.config.stats_interval;
636
637 tokio::spawn(async move {
638 let mut interval = tokio::time::interval(interval_duration);
639
640 loop {
641 interval.tick().await;
642
643 let stats_snapshot = stats.lock().await.clone();
644
645 info!(
646 "Node statistics - Connections: {}/{}, NAT traversal: {}/{}",
647 stats_snapshot.active_connections,
648 stats_snapshot.successful_connections,
649 stats_snapshot.nat_traversal_successes,
650 stats_snapshot.nat_traversal_attempts
651 );
652 }
653 })
654 }
655
656 pub async fn get_nat_stats(
658 &self,
659 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
660 self.nat_endpoint.get_nat_stats()
661 }
662
663 pub async fn get_connection_metrics(
667 &self,
668 peer_id: &PeerId,
669 ) -> Result<ConnectionMetrics, Box<dyn std::error::Error + Send + Sync>> {
670 match self.nat_endpoint.get_connection(peer_id) {
671 Ok(Some(connection)) => {
672 let rtt = connection.rtt();
674
675 let stats = connection.stats();
677
678 Ok(ConnectionMetrics {
679 bytes_sent: stats.udp_tx.bytes,
680 bytes_received: stats.udp_rx.bytes,
681 rtt: Some(rtt),
682 packet_loss: stats.path.lost_packets as f64
683 / (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
684 })
685 }
686 Ok(None) => Err("Connection not found".into()),
687 Err(e) => Err(format!("Failed to get connection: {e}").into()),
688 }
689 }
690
691 pub fn peer_id(&self) -> PeerId {
693 self.peer_id
694 }
695
696 pub fn public_key_bytes(&self) -> [u8; 32] {
698 self.auth_manager.public_key_bytes()
699 }
700
701 pub async fn relabel_peer(
703 &self,
704 old_peer_id: PeerId,
705 new_peer_id: PeerId,
706 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
707 if old_peer_id == new_peer_id {
708 return Ok(());
709 }
710
711 self.nat_endpoint
712 .relabel_connection(old_peer_id, new_peer_id)?;
713
714 let mut peers = self.connected_peers.write().await;
715 if let Some(addr) = peers.remove(&old_peer_id) {
716 peers.insert(new_peer_id, addr);
717 }
718
719 Ok(())
720 }
721
722 async fn send_auth_message(
724 &self,
725 peer_id: &PeerId,
726 message: AuthMessage,
727 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
728 let data = AuthManager::serialize_message(&message)?;
729 self.send_to_peer(peer_id, &data).await
730 }
731
732 async fn authenticate_as_initiator(
734 &self,
735 peer_id: &PeerId,
736 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
737 info!("Starting authentication with peer {:?}", peer_id);
738
739 let auth_request = self.auth_manager.create_auth_request();
741 self.send_auth_message(peer_id, auth_request).await?;
742
743 let timeout_duration = self.config.auth_config.auth_timeout;
745 let start = Instant::now();
746
747 while start.elapsed() < timeout_duration {
748 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
749 Ok(Ok((recv_peer_id, data))) => {
750 if recv_peer_id == *peer_id {
751 match AuthManager::deserialize_message(&data) {
752 Ok(AuthMessage::Challenge { nonce, .. }) => {
753 let response =
755 self.auth_manager.create_challenge_response(nonce)?;
756 self.send_auth_message(peer_id, response).await?;
757 }
758 Ok(AuthMessage::AuthSuccess { .. }) => {
759 info!("Authentication successful with peer {:?}", peer_id);
760 return Ok(());
761 }
762 Ok(AuthMessage::AuthFailure { reason }) => {
763 return Err(format!("Authentication failed: {reason}").into());
764 }
765 _ => continue,
766 }
767 }
768 }
769 _ => continue,
770 }
771 }
772
773 Err("Authentication timeout".into())
774 }
775
776 async fn handle_auth_message(
778 &self,
779 peer_id: PeerId,
780 message: AuthMessage,
781 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
782 let auth_protocol = AuthProtocol::new(Arc::clone(&self.auth_manager));
783
784 match auth_protocol.handle_message(peer_id, message).await {
785 Ok(Some(response)) => {
786 self.send_auth_message(&peer_id, response).await?;
787 }
788 Ok(None) => {
789 }
791 Err(e) => {
792 error!("Authentication error: {}", e);
793 let failure = AuthMessage::AuthFailure {
794 reason: e.to_string(),
795 };
796 self.send_auth_message(&peer_id, failure).await?;
797 return Err(Box::new(e));
798 }
799 }
800
801 Ok(())
802 }
803
804 pub async fn is_peer_authenticated(&self, peer_id: &PeerId) -> bool {
806 self.auth_manager.is_authenticated(peer_id).await
807 }
808
809 pub async fn list_authenticated_peers(&self) -> Vec<PeerId> {
811 self.auth_manager.list_authenticated_peers().await
812 }
813
814 async fn handle_incoming_auth(
816 &self,
817 peer_id: PeerId,
818 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
819 info!("Handling incoming authentication from peer {:?}", peer_id);
820
821 let timeout_duration = self.config.auth_config.auth_timeout;
822 let start = Instant::now();
823
824 while start.elapsed() < timeout_duration {
825 match tokio::time::timeout(Duration::from_secs(1), self.receive()).await {
826 Ok(Ok((recv_peer_id, data))) => {
827 if recv_peer_id == peer_id {
828 match AuthManager::deserialize_message(&data) {
829 Ok(auth_msg) => {
830 self.handle_auth_message(peer_id, auth_msg).await?;
831
832 if self.auth_manager.is_authenticated(&peer_id).await {
834 info!("Peer {:?} successfully authenticated", peer_id);
835 return Ok(());
836 }
837 }
838 Err(_) => {
839 continue;
841 }
842 }
843 }
844 }
845 _ => continue,
846 }
847 }
848
849 Err("Authentication timeout waiting for peer".into())
850 }
851
852 pub fn get_metrics_collector(
854 &self,
855 ) -> Result<Arc<crate::logging::MetricsCollector>, &'static str> {
856 Ok(Arc::new(crate::logging::MetricsCollector::new()))
860 }
861}