1use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::sync::{RwLock, broadcast};
37use tokio::time::Instant;
38use tracing::{debug, info, warn};
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct NodeConfig {
43 pub peer_id: Option<PeerId>,
45
46 pub listen_addrs: Vec<std::net::SocketAddr>,
48
49 pub listen_addr: std::net::SocketAddr,
51
52 pub bootstrap_peers: Vec<std::net::SocketAddr>,
54
55 pub bootstrap_peers_str: Vec<String>,
57
58 pub enable_ipv6: bool,
60
61 pub connection_timeout: Duration,
64
65 pub keep_alive_interval: Duration,
67
68 pub max_connections: usize,
70
71 pub max_incoming_connections: usize,
73
74 pub dht_config: DHTConfig,
76
77 pub security_config: SecurityConfig,
79
80 pub production_config: Option<ProductionConfig>,
82
83 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DHTConfig {
90 pub k_value: usize,
92
93 pub alpha_value: usize,
95
96 pub record_ttl: Duration,
98
99 pub refresh_interval: Duration,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SecurityConfig {
106 pub enable_noise: bool,
108
109 pub enable_tls: bool,
111
112 pub trust_level: TrustLevel,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub enum TrustLevel {
119 None,
121 Basic,
123 Full,
125}
126
127impl NodeConfig {
128 pub fn new() -> Result<Self> {
134 let config = Config::default();
136
137 let listen_addr = config.listen_socket_addr()?;
139
140 let mut listen_addrs = vec![];
142
143 if config.network.ipv6_enabled {
145 let ipv6_addr = std::net::SocketAddr::new(
146 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
147 listen_addr.port(),
148 );
149 listen_addrs.push(ipv6_addr);
150 }
151
152 let ipv4_addr = std::net::SocketAddr::new(
154 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
155 listen_addr.port(),
156 );
157 listen_addrs.push(ipv4_addr);
158
159 Ok(Self {
160 peer_id: None,
161 listen_addrs,
162 listen_addr,
163 bootstrap_peers: Vec::new(),
164 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
165 enable_ipv6: config.network.ipv6_enabled,
166
167 connection_timeout: Duration::from_secs(config.network.connection_timeout),
168 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
169 max_connections: config.network.max_connections,
170 max_incoming_connections: config.security.connection_limit as usize,
171 dht_config: DHTConfig::default(),
172 security_config: SecurityConfig::default(),
173 production_config: None,
174 bootstrap_cache_config: None,
175 })
177 }
178}
179
180impl Default for NodeConfig {
181 fn default() -> Self {
182 let config = Config::default();
184
185 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
187 std::net::SocketAddr::new(
188 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
189 9000,
190 )
191 });
192
193 Self {
194 peer_id: None,
195 listen_addrs: vec![
196 std::net::SocketAddr::new(
197 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
198 listen_addr.port(),
199 ),
200 std::net::SocketAddr::new(
201 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
202 listen_addr.port(),
203 ),
204 ],
205 listen_addr,
206 bootstrap_peers: Vec::new(),
207 bootstrap_peers_str: Vec::new(),
208 enable_ipv6: config.network.ipv6_enabled,
209
210 connection_timeout: Duration::from_secs(config.network.connection_timeout),
211 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
212 max_connections: config.network.max_connections,
213 max_incoming_connections: config.security.connection_limit as usize,
214 dht_config: DHTConfig::default(),
215 security_config: SecurityConfig::default(),
216 production_config: None, bootstrap_cache_config: None,
218 }
220 }
221}
222
223impl NodeConfig {
224 pub fn from_config(config: &Config) -> Result<Self> {
226 let listen_addr = config.listen_socket_addr()?;
227 let bootstrap_addrs = config.bootstrap_addrs()?;
228
229 let mut node_config = Self {
230 peer_id: None,
231 listen_addrs: vec![listen_addr],
232 listen_addr,
233 bootstrap_peers: bootstrap_addrs
234 .iter()
235 .map(|addr| addr.socket_addr())
236 .collect(),
237 bootstrap_peers_str: config
238 .network
239 .bootstrap_nodes
240 .iter()
241 .map(|addr| addr.to_string())
242 .collect(),
243 enable_ipv6: config.network.ipv6_enabled,
244
245 connection_timeout: Duration::from_secs(config.network.connection_timeout),
246 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
247 max_connections: config.network.max_connections,
248 max_incoming_connections: config.security.connection_limit as usize,
249 dht_config: DHTConfig {
250 k_value: 20,
251 alpha_value: 3,
252 record_ttl: Duration::from_secs(3600),
253 refresh_interval: Duration::from_secs(900),
254 },
255 security_config: SecurityConfig {
256 enable_noise: true,
257 enable_tls: true,
258 trust_level: TrustLevel::Basic,
259 },
260 production_config: Some(ProductionConfig {
261 max_connections: config.network.max_connections,
262 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
265 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266 health_check_interval: Duration::from_secs(30),
267 metrics_interval: Duration::from_secs(60),
268 enable_performance_tracking: true,
269 enable_auto_cleanup: true,
270 shutdown_timeout: Duration::from_secs(30),
271 rate_limits: crate::production::RateLimitConfig::default(),
272 }),
273 bootstrap_cache_config: None,
274 };
279
280 if config.network.ipv6_enabled {
282 node_config.listen_addrs.push(std::net::SocketAddr::new(
283 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
284 listen_addr.port(),
285 ));
286 }
287
288 Ok(node_config)
289 }
290
291 pub fn with_listen_addr(addr: &str) -> Result<Self> {
293 let listen_addr: std::net::SocketAddr = addr
294 .parse()
295 .map_err(|e: std::net::AddrParseError| {
296 NetworkError::InvalidAddress(e.to_string().into())
297 })
298 .map_err(P2PError::Network)?;
299 let cfg = NodeConfig {
300 listen_addr,
301 listen_addrs: vec![listen_addr],
302 ..Default::default()
303 };
304 Ok(cfg)
305 }
306}
307
308impl Default for DHTConfig {
309 fn default() -> Self {
310 Self {
311 k_value: 20,
312 alpha_value: 5,
313 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
316 }
317}
318
319impl Default for SecurityConfig {
320 fn default() -> Self {
321 Self {
322 enable_noise: true,
323 enable_tls: true,
324 trust_level: TrustLevel::Basic,
325 }
326 }
327}
328
329#[derive(Debug, Clone)]
331pub struct PeerInfo {
332 pub peer_id: PeerId,
334
335 pub addresses: Vec<String>,
337
338 pub connected_at: Instant,
340
341 pub last_seen: Instant,
343
344 pub status: ConnectionStatus,
346
347 pub protocols: Vec<String>,
349
350 pub heartbeat_count: u64,
352}
353
354#[derive(Debug, Clone, PartialEq)]
356pub enum ConnectionStatus {
357 Connecting,
359 Connected,
361 Disconnecting,
363 Disconnected,
365 Failed(String),
367}
368
369#[derive(Debug, Clone)]
371pub enum NetworkEvent {
372 PeerConnected {
374 peer_id: PeerId,
376 addresses: Vec<String>,
378 },
379
380 PeerDisconnected {
382 peer_id: PeerId,
384 reason: String,
386 },
387
388 MessageReceived {
390 peer_id: PeerId,
392 protocol: String,
394 data: Vec<u8>,
396 },
397
398 ConnectionFailed {
400 peer_id: Option<PeerId>,
402 address: String,
404 error: String,
406 },
407
408 DHTRecordStored {
410 key: Vec<u8>,
412 value: Vec<u8>,
414 },
415
416 DHTRecordRetrieved {
418 key: Vec<u8>,
420 value: Option<Vec<u8>>,
422 },
423}
424
425#[derive(Debug, Clone)]
430pub enum P2PEvent {
431 Message {
433 topic: String,
435 source: PeerId,
437 data: Vec<u8>,
439 },
440 PeerConnected(PeerId),
442 PeerDisconnected(PeerId),
444}
445
446pub struct P2PNode {
456 config: NodeConfig,
458
459 peer_id: PeerId,
461
462 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
464
465 event_tx: broadcast::Sender<P2PEvent>,
467
468 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
470
471 start_time: Instant,
473
474 running: RwLock<bool>,
476
477 dht: Option<Arc<RwLock<DHT>>>,
479
480 resource_manager: Option<Arc<ResourceManager>>,
482
483 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
485
486 dual_node: Arc<DualStackNetworkNode>,
488
489 #[allow(dead_code)]
491 rate_limiter: Arc<RateLimiter>,
492}
493
494impl P2PNode {
495 pub fn new_for_tests() -> Result<Self> {
497 let (event_tx, _) = broadcast::channel(16);
498 Ok(Self {
499 config: NodeConfig::default(),
500 peer_id: "test_peer".to_string(),
501 peers: Arc::new(RwLock::new(HashMap::new())),
502 event_tx,
503 listen_addrs: RwLock::new(Vec::new()),
504 start_time: Instant::now(),
505 running: RwLock::new(false),
506 dht: None,
507 resource_manager: None,
508 bootstrap_manager: None,
509 dual_node: {
510 let v6: Option<std::net::SocketAddr> = "[::1]:0"
512 .parse()
513 .ok()
514 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
515 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
516 let handle = tokio::runtime::Handle::current();
517 let dual_attempt = handle.block_on(
518 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
519 );
520 let dual = match dual_attempt {
521 Ok(d) => d,
522 Err(_e1) => {
523 let fallback = handle.block_on(
525 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
526 None,
527 "127.0.0.1:0".parse().ok(),
528 ),
529 );
530 match fallback {
531 Ok(d) => d,
532 Err(e2) => {
533 return Err(P2PError::Network(NetworkError::BindError(
534 format!("Failed to create dual-stack network node: {}", e2)
535 .into(),
536 )));
537 }
538 }
539 }
540 };
541 Arc::new(dual)
542 },
543 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
544 max_requests: 100,
545 burst_size: 100,
546 window: std::time::Duration::from_secs(1),
547 ..Default::default()
548 })),
549 })
550 }
551 pub async fn new(config: NodeConfig) -> Result<Self> {
553 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
554 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
556 });
557
558 let (event_tx, _) = broadcast::channel(1000);
559
560 {
563 use blake3::Hasher;
564 let mut hasher = Hasher::new();
565 hasher.update(peer_id.as_bytes());
566 let digest = hasher.finalize();
567 let mut nid = [0u8; 32];
568 nid.copy_from_slice(digest.as_bytes());
569 let twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
570 crate::identity::node_identity::NodeId::from_bytes(nid),
571 ));
572 let _ = crate::api::set_dht_instance(twdht);
573 }
574
575 let dht = if true {
577 let _dht_config = crate::dht::DHTConfig {
579 replication_factor: config.dht_config.k_value,
580 bucket_size: config.dht_config.k_value,
581 alpha: config.dht_config.alpha_value,
582 record_ttl: config.dht_config.record_ttl,
583 bucket_refresh_interval: config.dht_config.refresh_interval,
584 republish_interval: config.dht_config.refresh_interval,
585 max_distance: 160, };
587 let peer_bytes = peer_id.as_bytes();
589 let mut node_id_bytes = [0u8; 32];
590 let len = peer_bytes.len().min(32);
591 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
592 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
593 let dht_instance = DHT::new(node_id).map_err(|e| {
594 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
595 e.to_string().into(),
596 ))
597 })?;
598 Some(Arc::new(RwLock::new(dht_instance)))
599 } else {
600 None
601 };
602
603 let resource_manager = config
607 .production_config
608 .clone()
609 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
610
611 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
613 match BootstrapManager::with_config(cache_config.clone()).await {
614 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
615 Err(e) => {
616 warn!(
617 "Failed to initialize bootstrap manager: {}, continuing without cache",
618 e
619 );
620 None
621 }
622 }
623 } else {
624 match BootstrapManager::new().await {
625 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
626 Err(e) => {
627 warn!(
628 "Failed to initialize bootstrap manager: {}, continuing without cache",
629 e
630 );
631 None
632 }
633 }
634 };
635
636 let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
638 let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
639 let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
640 (v6_addr, v4_addr)
641 } else {
642 let v4_addr = Some(std::net::SocketAddr::new(
644 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
645 config.listen_addr.port(),
646 ));
647 let v6_addr = if config.enable_ipv6 {
648 Some(std::net::SocketAddr::new(
649 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
650 config.listen_addr.port(),
651 ))
652 } else {
653 None
654 };
655 (v6_addr, v4_addr)
656 };
657
658 let dual_node = Arc::new(
659 DualStackNetworkNode::new(v6_opt, v4_opt)
660 .await
661 .map_err(|e| {
662 P2PError::Transport(crate::error::TransportError::SetupFailed(
663 format!("Failed to create dual-stack network nodes: {}", e).into(),
664 ))
665 })?,
666 );
667
668 let rate_limiter = Arc::new(RateLimiter::new(
670 crate::validation::RateLimitConfig::default(),
671 ));
672
673 let node = Self {
674 config,
675 peer_id,
676 peers: Arc::new(RwLock::new(HashMap::new())),
677 event_tx,
678 listen_addrs: RwLock::new(Vec::new()),
679 start_time: Instant::now(),
680 running: RwLock::new(false),
681 dht,
682 resource_manager,
683 bootstrap_manager,
684 dual_node,
685 rate_limiter,
686 };
687 info!("Created P2P node with peer ID: {}", node.peer_id);
688
689 Ok(node)
690 }
691
692 pub fn builder() -> NodeBuilder {
694 NodeBuilder::new()
695 }
696
697 pub fn peer_id(&self) -> &PeerId {
699 &self.peer_id
700 }
701
702 pub fn local_addr(&self) -> Option<String> {
703 self.listen_addrs
704 .try_read()
705 .ok()
706 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
707 }
708
709 pub async fn subscribe(&self, topic: &str) -> Result<()> {
710 info!("Subscribed to topic: {}", topic);
713 Ok(())
714 }
715
716 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
717 info!(
718 "Publishing message to topic: {} ({} bytes)",
719 topic,
720 data.len()
721 );
722
723 let peer_list: Vec<PeerId> = {
725 let peers_guard = self.peers.read().await;
726 peers_guard.keys().cloned().collect()
727 };
728
729 if peer_list.is_empty() {
730 debug!("No peers connected, message will only be sent to local subscribers");
731 } else {
732 let mut send_count = 0;
734 for peer_id in &peer_list {
735 match self.send_message(peer_id, topic, data.to_vec()).await {
736 Ok(_) => {
737 send_count += 1;
738 debug!("Sent message to peer: {}", peer_id);
739 }
740 Err(e) => {
741 warn!("Failed to send message to peer {}: {}", peer_id, e);
742 }
743 }
744 }
745 info!(
746 "Published message to {}/{} connected peers",
747 send_count,
748 peer_list.len()
749 );
750 }
751
752 let event = P2PEvent::Message {
754 topic: topic.to_string(),
755 source: self.peer_id.clone(),
756 data: data.to_vec(),
757 };
758 let _ = self.event_tx.send(event);
759
760 Ok(())
761 }
762
763 pub fn config(&self) -> &NodeConfig {
765 &self.config
766 }
767
768 pub async fn start(&self) -> Result<()> {
770 info!("Starting P2P node...");
771
772 if let Some(ref resource_manager) = self.resource_manager {
774 resource_manager.start().await.map_err(|e| {
775 P2PError::Network(crate::error::NetworkError::ProtocolError(
776 format!("Failed to start resource manager: {e}").into(),
777 ))
778 })?;
779 info!("Production resource manager started");
780 }
781
782 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
784 let mut manager = bootstrap_manager.write().await;
785 manager.start_background_tasks().await.map_err(|e| {
786 P2PError::Network(crate::error::NetworkError::ProtocolError(
787 format!("Failed to start bootstrap manager: {e}").into(),
788 ))
789 })?;
790 info!("Bootstrap cache manager started");
791 }
792
793 *self.running.write().await = true;
795
796 self.start_network_listeners().await?;
798
799 let listen_addrs = self.listen_addrs.read().await;
801 info!("P2P node started on addresses: {:?}", *listen_addrs);
802
803 self.start_message_receiving_system().await?;
807
808 self.connect_bootstrap_peers().await?;
810
811 Ok(())
812 }
813
814 async fn start_network_listeners(&self) -> Result<()> {
816 info!("Starting dual-stack listeners (ant-quic)...");
817 let addrs = self.dual_node.local_addrs();
819 {
820 let mut la = self.listen_addrs.write().await;
821 *la = addrs.clone();
822 }
823
824 let event_tx = self.event_tx.clone();
826 let peers = self.peers.clone();
827 let rate_limiter = self.rate_limiter.clone();
828 let dual = self.dual_node.clone();
829 tokio::spawn(async move {
830 loop {
831 match dual.accept_any().await {
832 Ok((ant_peer_id, remote_sock)) => {
833 let peer_id =
834 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
835 let remote_addr = NetworkAddress::from(remote_sock);
836 let _ = rate_limiter.check_ip(&remote_sock.ip());
838 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
839 register_new_peer(&peers, &peer_id, &remote_addr).await;
840 }
841 Err(e) => {
842 warn!("Accept failed: {}", e);
843 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
844 }
845 }
846 }
847 });
848
849 info!("Dual-stack listeners active on: {:?}", addrs);
850 Ok(())
851 }
852
853 #[allow(dead_code)]
855 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
856 warn!("QUIC transport temporarily disabled during ant-quic migration");
895 Err(crate::P2PError::Transport(
897 crate::error::TransportError::SetupFailed(
898 format!(
899 "Failed to start QUIC listener on {addr} - transport disabled during migration"
900 )
901 .into(),
902 ),
903 ))
904 }
905
906 #[allow(dead_code)] async fn start_connection_acceptor(
909 &self,
910 transport: Arc<dyn crate::transport::Transport>,
911 addr: std::net::SocketAddr,
912 transport_type: crate::transport::TransportType,
913 ) -> Result<()> {
914 info!(
915 "Starting connection acceptor for {:?} on {}",
916 transport_type, addr
917 );
918
919 let event_tx = self.event_tx.clone();
921 let _peer_id = self.peer_id.clone();
922 let peers = Arc::clone(&self.peers);
923 let rate_limiter = Arc::clone(&self.rate_limiter);
926
927 tokio::spawn(async move {
929 loop {
930 match transport.accept().await {
931 Ok(connection) => {
932 let remote_addr = connection.remote_addr();
933 let connection_peer_id =
934 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
935
936 let socket_addr = remote_addr.socket_addr();
938 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
939 continue;
941 }
942
943 info!(
944 "Accepted {:?} connection from {} (peer: {})",
945 transport_type, remote_addr, connection_peer_id
946 );
947
948 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
950
951 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
953
954 spawn_connection_handler(
956 connection,
957 connection_peer_id,
958 event_tx.clone(),
959 Arc::clone(&peers),
960 );
961 }
962 Err(e) => {
963 warn!(
964 "Failed to accept {:?} connection on {}: {}",
965 transport_type, addr, e
966 );
967
968 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
970 }
971 }
972 }
973 });
974
975 info!(
976 "Connection acceptor background task started for {:?} on {}",
977 transport_type, addr
978 );
979 Ok(())
980 }
981
982 async fn start_message_receiving_system(&self) -> Result<()> {
984 info!("Starting message receiving system");
985 let dual = self.dual_node.clone();
986 let event_tx = self.event_tx.clone();
987
988 tokio::spawn(async move {
989 loop {
990 match dual.receive_any().await {
991 Ok((_peer_id, bytes)) => {
992 #[allow(clippy::collapsible_if)]
994 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
995 if let (Some(protocol), Some(data), Some(from)) = (
996 value.get("protocol").and_then(|v| v.as_str()),
997 value.get("data").and_then(|v| v.as_array()),
998 value.get("from").and_then(|v| v.as_str()),
999 ) {
1000 let payload: Vec<u8> = data
1001 .iter()
1002 .filter_map(|v| v.as_u64().map(|n| n as u8))
1003 .collect();
1004 let _ = event_tx.send(P2PEvent::Message {
1005 topic: protocol.to_string(),
1006 source: from.to_string(),
1007 data: payload,
1008 });
1009 }
1010 }
1011 }
1012 Err(e) => {
1013 warn!("Receive error: {}", e);
1014 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1015 }
1016 }
1017 }
1018 });
1019
1020 Ok(())
1021 }
1022
1023 #[allow(dead_code)]
1025 async fn handle_received_message(
1026 &self,
1027 message_data: Vec<u8>,
1028 peer_id: &PeerId,
1029 _protocol: &str,
1030 event_tx: &broadcast::Sender<P2PEvent>,
1031 ) -> Result<()> {
1032 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1036 Ok(message) => {
1037 if let (Some(protocol), Some(data), Some(from)) = (
1038 message.get("protocol").and_then(|v| v.as_str()),
1039 message.get("data").and_then(|v| v.as_array()),
1040 message.get("from").and_then(|v| v.as_str()),
1041 ) {
1042 let data_bytes: Vec<u8> = data
1044 .iter()
1045 .filter_map(|v| v.as_u64().map(|n| n as u8))
1046 .collect();
1047
1048 let event = P2PEvent::Message {
1050 topic: protocol.to_string(),
1051 source: from.to_string(),
1052 data: data_bytes,
1053 };
1054
1055 let _ = event_tx.send(event);
1056 debug!("Generated message event from peer: {}", peer_id);
1057 }
1058 }
1059 Err(e) => {
1060 warn!("Failed to parse received message from {}: {}", peer_id, e);
1061 }
1062 }
1063
1064 Ok(())
1065 }
1066
1067 pub async fn run(&self) -> Result<()> {
1073 if !*self.running.read().await {
1074 self.start().await?;
1075 }
1076
1077 info!("P2P node running...");
1078
1079 loop {
1081 if !*self.running.read().await {
1082 break;
1083 }
1084
1085 self.periodic_tasks().await?;
1087
1088 tokio::time::sleep(Duration::from_millis(100)).await;
1090 }
1091
1092 info!("P2P node stopped");
1093 Ok(())
1094 }
1095
1096 pub async fn stop(&self) -> Result<()> {
1098 info!("Stopping P2P node...");
1099
1100 *self.running.write().await = false;
1102
1103 self.disconnect_all_peers().await?;
1105
1106 if let Some(ref resource_manager) = self.resource_manager {
1108 resource_manager.shutdown().await.map_err(|e| {
1109 P2PError::Network(crate::error::NetworkError::ProtocolError(
1110 format!("Failed to shutdown resource manager: {e}").into(),
1111 ))
1112 })?;
1113 info!("Production resource manager stopped");
1114 }
1115
1116 info!("P2P node stopped");
1117 Ok(())
1118 }
1119
1120 pub async fn shutdown(&self) -> Result<()> {
1122 self.stop().await
1123 }
1124
1125 pub async fn is_running(&self) -> bool {
1127 *self.running.read().await
1128 }
1129
1130 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1132 self.listen_addrs.read().await.clone()
1133 }
1134
1135 pub async fn connected_peers(&self) -> Vec<PeerId> {
1137 self.peers.read().await.keys().cloned().collect()
1138 }
1139
1140 pub async fn peer_count(&self) -> usize {
1142 self.peers.read().await.len()
1143 }
1144
1145 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1147 self.peers.read().await.get(peer_id).cloned()
1148 }
1149
1150 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1152 info!("Connecting to peer at: {}", address);
1153
1154 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1156 Some(resource_manager.acquire_connection().await?)
1157 } else {
1158 None
1159 };
1160
1161 let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1163 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1164 format!("{}: {}", address, e).into(),
1165 ))
1166 })?;
1167
1168 let peer_id = {
1170 match self
1171 .dual_node
1172 .connect_happy_eyeballs(&[_socket_addr])
1173 .await
1174 .map(|p| crate::transport::ant_quic_adapter::ant_peer_id_to_string(&p))
1175 {
1176 Ok(connected_peer_id) => {
1177 info!("Successfully connected to peer: {}", connected_peer_id);
1178 connected_peer_id
1179 }
1180 Err(e) => {
1181 warn!("Failed to connect to peer at {}: {}", address, e);
1182
1183 let demo_peer_id =
1186 format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
1187 warn!(
1188 "Using demo peer ID: {} (transport connection failed)",
1189 demo_peer_id
1190 );
1191 demo_peer_id
1192 }
1193 }
1194 };
1195
1196 let peer_info = PeerInfo {
1198 peer_id: peer_id.clone(),
1199 addresses: vec![address.to_string()],
1200 connected_at: Instant::now(),
1201 last_seen: Instant::now(),
1202 status: ConnectionStatus::Connected,
1203 protocols: vec!["p2p-foundation/1.0".to_string()],
1204 heartbeat_count: 0,
1205 };
1206
1207 self.peers.write().await.insert(peer_id.clone(), peer_info);
1209
1210 if let Some(ref resource_manager) = self.resource_manager {
1212 resource_manager.record_bandwidth(0, 0); }
1214
1215 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1217
1218 info!("Connected to peer: {}", peer_id);
1219 Ok(peer_id)
1220 }
1221
1222 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1224 info!("Disconnecting from peer: {}", peer_id);
1225
1226 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1227 peer_info.status = ConnectionStatus::Disconnected;
1228
1229 let _ = self
1231 .event_tx
1232 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1233
1234 info!("Disconnected from peer: {}", peer_id);
1235 }
1236
1237 Ok(())
1238 }
1239
1240 pub async fn send_message(
1242 &self,
1243 peer_id: &PeerId,
1244 protocol: &str,
1245 data: Vec<u8>,
1246 ) -> Result<()> {
1247 debug!(
1248 "Sending message to peer {} on protocol {}",
1249 peer_id, protocol
1250 );
1251
1252 if let Some(ref resource_manager) = self.resource_manager
1254 && !resource_manager
1255 .check_rate_limit(peer_id, "message")
1256 .await?
1257 {
1258 return Err(P2PError::ResourceExhausted(
1259 format!("Rate limit exceeded for peer {}", peer_id).into(),
1260 ));
1261 }
1262
1263 if !self.peers.read().await.contains_key(peer_id) {
1265 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1266 peer_id.to_string().into(),
1267 )));
1268 }
1269
1270 if let Some(ref resource_manager) = self.resource_manager {
1274 resource_manager.record_bandwidth(data.len() as u64, 0);
1275 }
1276
1277 let _message_data = self.create_protocol_message(protocol, data)?;
1279
1280 self.dual_node
1282 .send_to_peer_string(peer_id, &_message_data)
1283 .await
1284 .map_err(|e| {
1285 P2PError::Transport(crate::error::TransportError::StreamError(
1286 e.to_string().into(),
1287 ))
1288 })
1289 }
1290
1291 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1293 use serde_json::json;
1294
1295 let timestamp = std::time::SystemTime::now()
1296 .duration_since(std::time::UNIX_EPOCH)
1297 .map_err(|e| {
1298 P2PError::Network(NetworkError::ProtocolError(
1299 format!("System time error: {}", e).into(),
1300 ))
1301 })?
1302 .as_secs();
1303
1304 let message = json!({
1306 "protocol": protocol,
1307 "data": data,
1308 "from": self.peer_id,
1309 "timestamp": timestamp
1310 });
1311
1312 serde_json::to_vec(&message).map_err(|e| {
1313 P2PError::Transport(crate::error::TransportError::StreamError(
1314 format!("Failed to serialize message: {e}").into(),
1315 ))
1316 })
1317 }
1318
1319 }
1321
1322#[allow(dead_code)]
1324fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1325 use serde_json::json;
1326
1327 let timestamp = std::time::SystemTime::now()
1328 .duration_since(std::time::UNIX_EPOCH)
1329 .map_err(|e| {
1330 P2PError::Network(NetworkError::ProtocolError(
1331 format!("System time error: {}", e).into(),
1332 ))
1333 })?
1334 .as_secs();
1335
1336 let message = json!({
1338 "protocol": protocol,
1339 "data": data,
1340 "timestamp": timestamp
1341 });
1342
1343 serde_json::to_vec(&message).map_err(|e| {
1344 P2PError::Transport(crate::error::TransportError::StreamError(
1345 format!("Failed to serialize message: {e}").into(),
1346 ))
1347 })
1348}
1349
1350impl P2PNode {
1351 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1353 self.event_tx.subscribe()
1354 }
1355
1356 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1358 self.subscribe_events()
1359 }
1360
1361 pub fn uptime(&self) -> Duration {
1363 self.start_time.elapsed()
1364 }
1365
1366 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1376 if let Some(ref resource_manager) = self.resource_manager {
1377 Ok(resource_manager.get_metrics().await)
1378 } else {
1379 Err(P2PError::Network(
1380 crate::error::NetworkError::ProtocolError(
1381 "Production resource manager not enabled".to_string().into(),
1382 ),
1383 ))
1384 }
1385 }
1386
1387 pub async fn health_check(&self) -> Result<()> {
1389 if let Some(ref resource_manager) = self.resource_manager {
1390 resource_manager.health_check().await
1391 } else {
1392 let peer_count = self.peer_count().await;
1394 if peer_count > self.config.max_connections {
1395 Err(P2PError::Network(
1396 crate::error::NetworkError::ProtocolError(
1397 format!("Too many connections: {peer_count}").into(),
1398 ),
1399 ))
1400 } else {
1401 Ok(())
1402 }
1403 }
1404 }
1405
1406 pub fn production_config(&self) -> Option<&ProductionConfig> {
1408 self.config.production_config.as_ref()
1409 }
1410
1411 pub fn is_production_mode(&self) -> bool {
1413 self.resource_manager.is_some()
1414 }
1415
1416 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1418 self.dht.as_ref()
1419 }
1420
1421 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1423 if let Some(ref dht) = self.dht {
1424 let mut dht_instance = dht.write().await;
1425 let dht_key = crate::dht::DhtKey::from_bytes(key);
1426 dht_instance
1427 .store(&dht_key, value.clone())
1428 .await
1429 .map_err(|e| {
1430 P2PError::Dht(crate::error::DhtError::StoreFailed(
1431 format!("{:?}: {e}", key).into(),
1432 ))
1433 })?;
1434
1435 Ok(())
1436 } else {
1437 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1438 "DHT not enabled".to_string().into(),
1439 )))
1440 }
1441 }
1442
1443 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1445 if let Some(ref dht) = self.dht {
1446 let dht_instance = dht.read().await;
1447 let dht_key = crate::dht::DhtKey::from_bytes(key);
1448 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1449 P2PError::Dht(crate::error::DhtError::StoreFailed(
1450 format!("Retrieve failed: {e}").into(),
1451 ))
1452 })?;
1453
1454 Ok(record_result)
1455 } else {
1456 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1457 "DHT not enabled".to_string().into(),
1458 )))
1459 }
1460 }
1461
1462 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1464 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1465 let mut manager = bootstrap_manager.write().await;
1466 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1467 .iter()
1468 .filter_map(|addr| addr.parse().ok())
1469 .collect();
1470 let contact = ContactEntry::new(peer_id, socket_addresses);
1471 manager.add_contact(contact).await.map_err(|e| {
1472 P2PError::Network(crate::error::NetworkError::ProtocolError(
1473 format!("Failed to add peer to bootstrap cache: {e}").into(),
1474 ))
1475 })?;
1476 }
1477 Ok(())
1478 }
1479
1480 pub async fn update_peer_metrics(
1482 &self,
1483 peer_id: &PeerId,
1484 success: bool,
1485 latency_ms: Option<u64>,
1486 _error: Option<String>,
1487 ) -> Result<()> {
1488 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1489 let mut manager = bootstrap_manager.write().await;
1490
1491 let metrics = QualityMetrics {
1493 success_rate: if success { 1.0 } else { 0.0 },
1494 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1495 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
1497 last_successful_connection: if success {
1498 chrono::Utc::now()
1499 } else {
1500 chrono::Utc::now() - chrono::Duration::hours(1)
1501 },
1502 uptime_score: 0.5,
1503 };
1504
1505 manager
1506 .update_contact_metrics(peer_id, metrics)
1507 .await
1508 .map_err(|e| {
1509 P2PError::Network(crate::error::NetworkError::ProtocolError(
1510 format!("Failed to update peer metrics: {e}").into(),
1511 ))
1512 })?;
1513 }
1514 Ok(())
1515 }
1516
1517 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1519 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1520 let manager = bootstrap_manager.read().await;
1521 let stats = manager.get_stats().await.map_err(|e| {
1522 P2PError::Network(crate::error::NetworkError::ProtocolError(
1523 format!("Failed to get bootstrap stats: {e}").into(),
1524 ))
1525 })?;
1526 Ok(Some(stats))
1527 } else {
1528 Ok(None)
1529 }
1530 }
1531
1532 pub async fn cached_peer_count(&self) -> usize {
1534 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1535 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1536 {
1537 return stats.total_contacts;
1538 }
1539 0
1540 }
1541
1542 async fn connect_bootstrap_peers(&self) -> Result<()> {
1544 let mut bootstrap_contacts = Vec::new();
1545 let mut used_cache = false;
1546
1547 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1549 let manager = bootstrap_manager.read().await;
1550 match manager.get_bootstrap_peers(20).await {
1551 Ok(contacts) => {
1553 if !contacts.is_empty() {
1554 info!("Using {} cached bootstrap peers", contacts.len());
1555 bootstrap_contacts = contacts;
1556 used_cache = true;
1557 }
1558 }
1559 Err(e) => {
1560 warn!("Failed to get cached bootstrap peers: {}", e);
1561 }
1562 }
1563 }
1564
1565 if bootstrap_contacts.is_empty() {
1567 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1568 &self.config.bootstrap_peers_str
1569 } else {
1570 &self
1572 .config
1573 .bootstrap_peers
1574 .iter()
1575 .map(|addr| addr.to_string())
1576 .collect::<Vec<_>>()
1577 };
1578
1579 if bootstrap_peers.is_empty() {
1580 info!("No bootstrap peers configured and no cached peers available");
1581 return Ok(());
1582 }
1583
1584 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1585
1586 for addr in bootstrap_peers {
1587 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1588 let contact = ContactEntry::new(
1589 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1590 vec![socket_addr],
1591 );
1592 bootstrap_contacts.push(contact);
1593 } else {
1594 warn!("Invalid bootstrap address format: {}", addr);
1595 }
1596 }
1597 }
1598
1599 let mut successful_connections = 0;
1601 for contact in bootstrap_contacts {
1602 for addr in &contact.addresses {
1603 match self.connect_peer(&addr.to_string()).await {
1604 Ok(peer_id) => {
1605 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1606 successful_connections += 1;
1607
1608 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1610 let mut manager = bootstrap_manager.write().await;
1611 let mut updated_contact = contact.clone();
1612 updated_contact.peer_id = peer_id.clone();
1613 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
1616 warn!("Failed to update bootstrap cache: {}", e);
1617 }
1618 }
1619 break; }
1621 Err(e) => {
1622 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1623
1624 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1626 let mut manager = bootstrap_manager.write().await;
1627 let mut updated_contact = contact.clone();
1628 updated_contact.update_connection_result(
1629 false,
1630 None,
1631 Some(e.to_string()),
1632 );
1633
1634 if let Err(e) = manager.add_contact(updated_contact).await {
1635 warn!("Failed to update bootstrap cache: {}", e);
1636 }
1637 }
1638 }
1639 }
1640 }
1641 }
1642
1643 if successful_connections == 0 {
1644 if !used_cache {
1645 warn!("Failed to connect to any bootstrap peers");
1646 }
1647 return Err(P2PError::Network(NetworkError::ConnectionFailed {
1648 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
1650 }));
1651 }
1652 info!(
1653 "Successfully connected to {} bootstrap peers",
1654 successful_connections
1655 );
1656
1657 Ok(())
1658 }
1659
1660 async fn disconnect_all_peers(&self) -> Result<()> {
1662 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
1663
1664 for peer_id in peer_ids {
1665 self.disconnect_peer(&peer_id).await?;
1666 }
1667
1668 Ok(())
1669 }
1670
1671 async fn periodic_tasks(&self) -> Result<()> {
1673 Ok(())
1679 }
1680}
1681
1682#[async_trait::async_trait]
1684pub trait NetworkSender: Send + Sync {
1685 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1687
1688 fn local_peer_id(&self) -> &PeerId;
1690}
1691
1692#[derive(Clone)]
1694pub struct P2PNetworkSender {
1695 peer_id: PeerId,
1696 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1698}
1699
1700impl P2PNetworkSender {
1701 pub fn new(
1702 peer_id: PeerId,
1703 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1704 ) -> Self {
1705 Self { peer_id, send_tx }
1706 }
1707}
1708
1709#[async_trait::async_trait]
1711impl NetworkSender for P2PNetworkSender {
1712 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
1714 self.send_tx
1715 .send((peer_id.clone(), protocol.to_string(), data))
1716 .map_err(|_| {
1717 P2PError::Network(crate::error::NetworkError::ProtocolError(
1718 "Failed to send message via channel".to_string().into(),
1719 ))
1720 })?;
1721 Ok(())
1722 }
1723
1724 fn local_peer_id(&self) -> &PeerId {
1726 &self.peer_id
1727 }
1728}
1729
1730pub struct NodeBuilder {
1732 config: NodeConfig,
1733}
1734
1735impl Default for NodeBuilder {
1736 fn default() -> Self {
1737 Self::new()
1738 }
1739}
1740
1741impl NodeBuilder {
1742 pub fn new() -> Self {
1744 Self {
1745 config: NodeConfig::default(),
1746 }
1747 }
1748
1749 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1751 self.config.peer_id = Some(peer_id);
1752 self
1753 }
1754
1755 pub fn listen_on(mut self, addr: &str) -> Self {
1757 if let Ok(multiaddr) = addr.parse() {
1758 self.config.listen_addrs.push(multiaddr);
1759 }
1760 self
1761 }
1762
1763 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1765 if let Ok(multiaddr) = addr.parse() {
1766 self.config.bootstrap_peers.push(multiaddr);
1767 }
1768 self.config.bootstrap_peers_str.push(addr.to_string());
1769 self
1770 }
1771
1772 pub fn with_ipv6(mut self, enable: bool) -> Self {
1774 self.config.enable_ipv6 = enable;
1775 self
1776 }
1777
1778 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1782 self.config.connection_timeout = timeout;
1783 self
1784 }
1785
1786 pub fn with_max_connections(mut self, max: usize) -> Self {
1788 self.config.max_connections = max;
1789 self
1790 }
1791
1792 pub fn with_production_mode(mut self) -> Self {
1794 self.config.production_config = Some(ProductionConfig::default());
1795 self
1796 }
1797
1798 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1800 self.config.production_config = Some(production_config);
1801 self
1802 }
1803
1804 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
1806 self.config.dht_config = dht_config;
1807 self
1808 }
1809
1810 pub fn with_default_dht(mut self) -> Self {
1812 self.config.dht_config = DHTConfig::default();
1813 self
1814 }
1815
1816 pub async fn build(self) -> Result<P2PNode> {
1818 P2PNode::new(self.config).await
1819 }
1820}
1821
1822#[allow(dead_code)] async fn handle_received_message_standalone(
1825 message_data: Vec<u8>,
1826 peer_id: &PeerId,
1827 _protocol: &str,
1828 event_tx: &broadcast::Sender<P2PEvent>,
1829) -> Result<()> {
1830 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1832 Ok(message) => {
1833 if let (Some(protocol), Some(data), Some(from)) = (
1834 message.get("protocol").and_then(|v| v.as_str()),
1835 message.get("data").and_then(|v| v.as_array()),
1836 message.get("from").and_then(|v| v.as_str()),
1837 ) {
1838 let data_bytes: Vec<u8> = data
1840 .iter()
1841 .filter_map(|v| v.as_u64().map(|n| n as u8))
1842 .collect();
1843
1844 let event = P2PEvent::Message {
1846 topic: protocol.to_string(),
1847 source: from.to_string(),
1848 data: data_bytes,
1849 };
1850
1851 let _ = event_tx.send(event);
1852 debug!("Generated message event from peer: {}", peer_id);
1853 }
1854 }
1855 Err(e) => {
1856 warn!("Failed to parse received message from {}: {}", peer_id, e);
1857 }
1858 }
1859
1860 Ok(())
1861}
1862
1863#[allow(dead_code)]
1867fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
1868 match create_protocol_message_static(protocol, data) {
1869 Ok(msg) => Some(msg),
1870 Err(e) => {
1871 warn!("Failed to create protocol message: {}", e);
1872 None
1873 }
1874 }
1875}
1876
1877#[allow(dead_code)]
1879async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
1880 match result {
1881 Ok(_) => {
1882 debug!("Message sent to peer {} via transport layer", peer_id);
1883 }
1884 Err(e) => {
1885 warn!("Failed to send message to peer {}: {}", peer_id, e);
1886 }
1887 }
1888}
1889
1890#[allow(dead_code)] fn check_rate_limit(
1893 rate_limiter: &RateLimiter,
1894 socket_addr: &std::net::SocketAddr,
1895 remote_addr: &NetworkAddress,
1896) -> Result<()> {
1897 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
1898 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
1899 e
1900 })
1901}
1902
1903#[allow(dead_code)] async fn register_new_peer(
1906 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1907 peer_id: &PeerId,
1908 remote_addr: &NetworkAddress,
1909) {
1910 let mut peers_guard = peers.write().await;
1911 let peer_info = PeerInfo {
1912 peer_id: peer_id.clone(),
1913 addresses: vec![remote_addr.to_string()],
1914 connected_at: tokio::time::Instant::now(),
1915 last_seen: tokio::time::Instant::now(),
1916 status: ConnectionStatus::Connected,
1917 protocols: vec!["p2p-chat/1.0.0".to_string()],
1918 heartbeat_count: 0,
1919 };
1920 peers_guard.insert(peer_id.clone(), peer_info);
1921}
1922
1923#[allow(dead_code)] fn spawn_connection_handler(
1926 connection: Box<dyn crate::transport::Connection>,
1927 peer_id: PeerId,
1928 event_tx: broadcast::Sender<P2PEvent>,
1929 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1930) {
1931 tokio::spawn(async move {
1932 handle_peer_connection(connection, peer_id, event_tx, peers).await;
1933 });
1934}
1935
1936#[allow(dead_code)] async fn handle_peer_connection(
1939 mut connection: Box<dyn crate::transport::Connection>,
1940 peer_id: PeerId,
1941 event_tx: broadcast::Sender<P2PEvent>,
1942 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1943) {
1944 loop {
1945 match connection.receive().await {
1946 Ok(message_data) => {
1947 debug!(
1948 "Received {} bytes from peer: {}",
1949 message_data.len(),
1950 peer_id
1951 );
1952
1953 if let Err(e) = handle_received_message_standalone(
1955 message_data,
1956 &peer_id,
1957 "unknown", &event_tx,
1959 )
1960 .await
1961 {
1962 warn!("Failed to handle message from peer {}: {}", peer_id, e);
1963 }
1964 }
1965 Err(e) => {
1966 warn!("Failed to receive message from {}: {}", peer_id, e);
1967
1968 if !connection.is_alive().await {
1970 info!("Connection to {} is dead, removing peer", peer_id);
1971
1972 remove_peer(&peers, &peer_id).await;
1974
1975 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
1977
1978 break; }
1980
1981 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1983 }
1984 }
1985 }
1986}
1987
1988#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
1991 let mut peers_guard = peers.write().await;
1992 peers_guard.remove(peer_id);
1993}
1994
1995#[allow(dead_code)]
1997async fn update_peer_heartbeat(
1998 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1999 peer_id: &PeerId,
2000) -> Result<()> {
2001 let mut peers_guard = peers.write().await;
2002 match peers_guard.get_mut(peer_id) {
2003 Some(peer_info) => {
2004 peer_info.last_seen = Instant::now();
2005 peer_info.heartbeat_count += 1;
2006 Ok(())
2007 }
2008 None => {
2009 warn!("Received heartbeat from unknown peer: {}", peer_id);
2010 Err(P2PError::Network(NetworkError::PeerNotFound(
2011 format!("Peer {} not found", peer_id).into(),
2012 )))
2013 }
2014 }
2015}
2016
2017#[allow(dead_code)]
2019async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2020 if let Some(manager) = resource_manager {
2021 let metrics = manager.get_metrics().await;
2022 (metrics.memory_used, metrics.cpu_usage)
2023 } else {
2024 (0, 0.0)
2025 }
2026}
2027
2028#[cfg(test)]
2029mod tests {
2030 use super::*;
2031 use std::time::Duration;
2033 use tokio::time::timeout;
2034
2035 fn create_test_node_config() -> NodeConfig {
2041 NodeConfig {
2042 peer_id: Some("test_peer_123".to_string()),
2043 listen_addrs: vec![
2044 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2045 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2046 ],
2047 listen_addr: std::net::SocketAddr::new(
2048 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2049 0,
2050 ),
2051 bootstrap_peers: vec![],
2052 bootstrap_peers_str: vec![],
2053 enable_ipv6: true,
2054
2055 connection_timeout: Duration::from_secs(10),
2056 keep_alive_interval: Duration::from_secs(30),
2057 max_connections: 100,
2058 max_incoming_connections: 50,
2059 dht_config: DHTConfig::default(),
2060 security_config: SecurityConfig::default(),
2061 production_config: None,
2062 bootstrap_cache_config: None,
2063 }
2065 }
2066
2067 #[tokio::test]
2071 async fn test_node_config_default() {
2072 let config = NodeConfig::default();
2073
2074 assert!(config.peer_id.is_none());
2075 assert_eq!(config.listen_addrs.len(), 2);
2076 assert!(config.enable_ipv6);
2077 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2079 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2080 }
2081
2082 #[tokio::test]
2083 async fn test_dht_config_default() {
2084 let config = DHTConfig::default();
2085
2086 assert_eq!(config.k_value, 20);
2087 assert_eq!(config.alpha_value, 5);
2088 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2089 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2090 }
2091
2092 #[tokio::test]
2093 async fn test_security_config_default() {
2094 let config = SecurityConfig::default();
2095
2096 assert!(config.enable_noise);
2097 assert!(config.enable_tls);
2098 assert_eq!(config.trust_level, TrustLevel::Basic);
2099 }
2100
2101 #[test]
2102 fn test_trust_level_variants() {
2103 let _none = TrustLevel::None;
2105 let _basic = TrustLevel::Basic;
2106 let _full = TrustLevel::Full;
2107
2108 assert_eq!(TrustLevel::None, TrustLevel::None);
2110 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2111 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2112 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2113 }
2114
2115 #[test]
2116 fn test_connection_status_variants() {
2117 let connecting = ConnectionStatus::Connecting;
2118 let connected = ConnectionStatus::Connected;
2119 let disconnecting = ConnectionStatus::Disconnecting;
2120 let disconnected = ConnectionStatus::Disconnected;
2121 let failed = ConnectionStatus::Failed("test error".to_string());
2122
2123 assert_eq!(connecting, ConnectionStatus::Connecting);
2124 assert_eq!(connected, ConnectionStatus::Connected);
2125 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2126 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2127 assert_ne!(connecting, connected);
2128
2129 if let ConnectionStatus::Failed(msg) = failed {
2130 assert_eq!(msg, "test error");
2131 } else {
2132 panic!("Expected Failed status");
2133 }
2134 }
2135
2136 #[tokio::test]
2137 async fn test_node_creation() -> Result<()> {
2138 let config = create_test_node_config();
2139 let node = P2PNode::new(config).await?;
2140
2141 assert_eq!(node.peer_id(), "test_peer_123");
2142 assert!(!node.is_running().await);
2143 assert_eq!(node.peer_count().await, 0);
2144 assert!(node.connected_peers().await.is_empty());
2145
2146 Ok(())
2147 }
2148
2149 #[tokio::test]
2150 async fn test_node_creation_without_peer_id() -> Result<()> {
2151 let mut config = create_test_node_config();
2152 config.peer_id = None;
2153
2154 let node = P2PNode::new(config).await?;
2155
2156 assert!(node.peer_id().starts_with("peer_"));
2158 assert!(!node.is_running().await);
2159
2160 Ok(())
2161 }
2162
2163 #[tokio::test]
2164 async fn test_node_lifecycle() -> Result<()> {
2165 let config = create_test_node_config();
2166 let node = P2PNode::new(config).await?;
2167
2168 assert!(!node.is_running().await);
2170
2171 node.start().await?;
2173 assert!(node.is_running().await);
2174
2175 let listen_addrs = node.listen_addrs().await;
2177 assert!(
2178 !listen_addrs.is_empty(),
2179 "Expected at least one listening address"
2180 );
2181
2182 node.stop().await?;
2184 assert!(!node.is_running().await);
2185
2186 Ok(())
2187 }
2188
2189 #[tokio::test]
2190 async fn test_peer_connection() -> Result<()> {
2191 let config = create_test_node_config();
2192 let node = P2PNode::new(config).await?;
2193
2194 let peer_addr = "127.0.0.1:0";
2195
2196 let peer_id = node.connect_peer(peer_addr).await?;
2198 assert!(peer_id.starts_with("peer_from_"));
2199
2200 assert_eq!(node.peer_count().await, 1);
2202
2203 let connected_peers = node.connected_peers().await;
2205 assert_eq!(connected_peers.len(), 1);
2206 assert_eq!(connected_peers[0], peer_id);
2207
2208 let peer_info = node.peer_info(&peer_id).await;
2210 assert!(peer_info.is_some());
2211 let info = peer_info.expect("Peer info should exist after adding peer");
2212 assert_eq!(info.peer_id, peer_id);
2213 assert_eq!(info.status, ConnectionStatus::Connected);
2214 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2215
2216 node.disconnect_peer(&peer_id).await?;
2218 assert_eq!(node.peer_count().await, 0);
2219
2220 Ok(())
2221 }
2222
2223 #[tokio::test]
2224 async fn test_event_subscription() -> Result<()> {
2225 let config = create_test_node_config();
2226 let node = P2PNode::new(config).await?;
2227
2228 let mut events = node.subscribe_events();
2229 let peer_addr = "127.0.0.1:0";
2230
2231 let peer_id = node.connect_peer(peer_addr).await?;
2233
2234 let event = timeout(Duration::from_millis(100), events.recv()).await;
2236 assert!(event.is_ok());
2237
2238 let event_result = event
2239 .expect("Should receive event")
2240 .expect("Event should not be error");
2241 match event_result {
2242 P2PEvent::PeerConnected(event_peer_id) => {
2243 assert_eq!(event_peer_id, peer_id);
2244 }
2245 _ => panic!("Expected PeerConnected event"),
2246 }
2247
2248 node.disconnect_peer(&peer_id).await?;
2250
2251 let event = timeout(Duration::from_millis(100), events.recv()).await;
2253 assert!(event.is_ok());
2254
2255 let event_result = event
2256 .expect("Should receive event")
2257 .expect("Event should not be error");
2258 match event_result {
2259 P2PEvent::PeerDisconnected(event_peer_id) => {
2260 assert_eq!(event_peer_id, peer_id);
2261 }
2262 _ => panic!("Expected PeerDisconnected event"),
2263 }
2264
2265 Ok(())
2266 }
2267
2268 #[tokio::test]
2269 async fn test_message_sending() -> Result<()> {
2270 let mut config1 = create_test_node_config();
2272 config1.listen_addr =
2273 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2274 let node1 = P2PNode::new(config1).await?;
2275 node1.start().await?;
2276
2277 let mut config2 = create_test_node_config();
2278 config2.listen_addr =
2279 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2280 let node2 = P2PNode::new(config2).await?;
2281 node2.start().await?;
2282
2283 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2285
2286 let node2_addr = node2.local_addr().ok_or_else(|| {
2288 P2PError::Network(crate::error::NetworkError::ProtocolError(
2289 "No listening address".to_string().into(),
2290 ))
2291 })?;
2292
2293 let peer_id = node1.connect_peer(&node2_addr).await?;
2295
2296 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2298
2299 let message_data = b"Hello, peer!".to_vec();
2301 let result = node1
2302 .send_message(&peer_id, "test-protocol", message_data)
2303 .await;
2304 if let Err(e) = &result {
2307 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2308 }
2309
2310 let non_existent_peer = "non_existent_peer".to_string();
2312 let result = node1
2313 .send_message(&non_existent_peer, "test-protocol", vec![])
2314 .await;
2315 assert!(result.is_err(), "Sending to non-existent peer should fail");
2316
2317 Ok(())
2318 }
2319
2320 #[tokio::test]
2321 async fn test_remote_mcp_operations() -> Result<()> {
2322 let config = create_test_node_config();
2323 let node = P2PNode::new(config).await?;
2324
2325 node.start().await?;
2327 node.stop().await?;
2328 Ok(())
2329 }
2330
2331 #[tokio::test]
2332 async fn test_health_check() -> Result<()> {
2333 let config = create_test_node_config();
2334 let node = P2PNode::new(config).await?;
2335
2336 let result = node.health_check().await;
2338 assert!(result.is_ok());
2339
2340 Ok(())
2345 }
2346
2347 #[tokio::test]
2348 async fn test_node_uptime() -> Result<()> {
2349 let config = create_test_node_config();
2350 let node = P2PNode::new(config).await?;
2351
2352 let uptime1 = node.uptime();
2353 assert!(uptime1 >= Duration::from_secs(0));
2354
2355 tokio::time::sleep(Duration::from_millis(10)).await;
2357
2358 let uptime2 = node.uptime();
2359 assert!(uptime2 > uptime1);
2360
2361 Ok(())
2362 }
2363
2364 #[tokio::test]
2365 async fn test_node_config_access() -> Result<()> {
2366 let config = create_test_node_config();
2367 let expected_peer_id = config.peer_id.clone();
2368 let node = P2PNode::new(config).await?;
2369
2370 let node_config = node.config();
2371 assert_eq!(node_config.peer_id, expected_peer_id);
2372 assert_eq!(node_config.max_connections, 100);
2373 Ok(())
2376 }
2377
2378 #[tokio::test]
2379 async fn test_mcp_server_access() -> Result<()> {
2380 let config = create_test_node_config();
2381 let _node = P2PNode::new(config).await?;
2382
2383 Ok(())
2385 }
2386
2387 #[tokio::test]
2388 async fn test_dht_access() -> Result<()> {
2389 let config = create_test_node_config();
2390 let node = P2PNode::new(config).await?;
2391
2392 assert!(node.dht().is_some());
2394
2395 Ok(())
2396 }
2397
2398 #[tokio::test]
2399 async fn test_node_builder() -> Result<()> {
2400 let builder = P2PNode::builder()
2402 .with_peer_id("builder_test_peer".to_string())
2403 .listen_on("/ip4/127.0.0.1/tcp/0")
2404 .listen_on("/ip6/::1/tcp/0")
2405 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
2407 .with_connection_timeout(Duration::from_secs(15))
2408 .with_max_connections(200);
2409
2410 let config = builder.config;
2412 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2413 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
2416 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2417 assert_eq!(config.max_connections, 200);
2418
2419 Ok(())
2420 }
2421
2422 #[tokio::test]
2423 async fn test_bootstrap_peers() -> Result<()> {
2424 let mut config = create_test_node_config();
2425 config.bootstrap_peers = vec![
2426 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2427 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2428 ];
2429
2430 let node = P2PNode::new(config).await?;
2431
2432 node.start().await?;
2434
2435 let peer_count = node.peer_count().await;
2438 assert!(
2439 peer_count <= 2,
2440 "Peer count should not exceed bootstrap peer count"
2441 );
2442
2443 node.stop().await?;
2444 Ok(())
2445 }
2446
2447 #[tokio::test]
2448 async fn test_production_mode_disabled() -> Result<()> {
2449 let config = create_test_node_config();
2450 let node = P2PNode::new(config).await?;
2451
2452 assert!(!node.is_production_mode());
2453 assert!(node.production_config().is_none());
2454
2455 let result = node.resource_metrics().await;
2457 assert!(result.is_err());
2458 assert!(result.unwrap_err().to_string().contains("not enabled"));
2459
2460 Ok(())
2461 }
2462
2463 #[tokio::test]
2464 async fn test_network_event_variants() {
2465 let peer_id = "test_peer".to_string();
2467 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2468
2469 let _peer_connected = NetworkEvent::PeerConnected {
2470 peer_id: peer_id.clone(),
2471 addresses: vec![address.clone()],
2472 };
2473
2474 let _peer_disconnected = NetworkEvent::PeerDisconnected {
2475 peer_id: peer_id.clone(),
2476 reason: "test disconnect".to_string(),
2477 };
2478
2479 let _message_received = NetworkEvent::MessageReceived {
2480 peer_id: peer_id.clone(),
2481 protocol: "test-protocol".to_string(),
2482 data: vec![1, 2, 3],
2483 };
2484
2485 let _connection_failed = NetworkEvent::ConnectionFailed {
2486 peer_id: Some(peer_id.clone()),
2487 address: address.clone(),
2488 error: "connection refused".to_string(),
2489 };
2490
2491 let _dht_stored = NetworkEvent::DHTRecordStored {
2492 key: vec![1, 2, 3],
2493 value: vec![4, 5, 6],
2494 };
2495
2496 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2497 key: vec![1, 2, 3],
2498 value: Some(vec![4, 5, 6]),
2499 };
2500 }
2501
2502 #[tokio::test]
2503 async fn test_peer_info_structure() {
2504 let peer_info = PeerInfo {
2505 peer_id: "test_peer".to_string(),
2506 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2507 connected_at: Instant::now(),
2508 last_seen: Instant::now(),
2509 status: ConnectionStatus::Connected,
2510 protocols: vec!["test-protocol".to_string()],
2511 heartbeat_count: 0,
2512 };
2513
2514 assert_eq!(peer_info.peer_id, "test_peer");
2515 assert_eq!(peer_info.addresses.len(), 1);
2516 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2517 assert_eq!(peer_info.protocols.len(), 1);
2518 }
2519
2520 #[tokio::test]
2521 async fn test_serialization() -> Result<()> {
2522 let config = create_test_node_config();
2524 let serialized = serde_json::to_string(&config)?;
2525 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2526
2527 assert_eq!(config.peer_id, deserialized.peer_id);
2528 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2529 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2530
2531 Ok(())
2532 }
2533}