1use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::json;
37use std::collections::{HashMap, HashSet};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::time::Duration;
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::Instant;
43use tracing::{debug, info, trace, warn};
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct NodeConfig {
48 pub peer_id: Option<PeerId>,
50
51 pub listen_addrs: Vec<std::net::SocketAddr>,
53
54 pub listen_addr: std::net::SocketAddr,
56
57 pub bootstrap_peers: Vec<std::net::SocketAddr>,
59
60 pub bootstrap_peers_str: Vec<String>,
62
63 pub enable_ipv6: bool,
65
66 pub connection_timeout: Duration,
69
70 pub keep_alive_interval: Duration,
72
73 pub max_connections: usize,
75
76 pub max_incoming_connections: usize,
78
79 pub dht_config: DHTConfig,
81
82 pub security_config: SecurityConfig,
84
85 pub production_config: Option<ProductionConfig>,
87
88 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct DHTConfig {
95 pub k_value: usize,
97
98 pub alpha_value: usize,
100
101 pub record_ttl: Duration,
103
104 pub refresh_interval: Duration,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SecurityConfig {
111 pub enable_noise: bool,
113
114 pub enable_tls: bool,
116
117 pub trust_level: TrustLevel,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
123pub enum TrustLevel {
124 None,
126 Basic,
128 Full,
130}
131
132impl NodeConfig {
133 pub fn new() -> Result<Self> {
139 let config = Config::default();
141
142 let listen_addr = config.listen_socket_addr()?;
144
145 let mut listen_addrs = vec![];
147
148 if config.network.ipv6_enabled {
150 let ipv6_addr = std::net::SocketAddr::new(
151 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
152 listen_addr.port(),
153 );
154 listen_addrs.push(ipv6_addr);
155 }
156
157 let ipv4_addr = std::net::SocketAddr::new(
159 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
160 listen_addr.port(),
161 );
162 listen_addrs.push(ipv4_addr);
163
164 Ok(Self {
165 peer_id: None,
166 listen_addrs,
167 listen_addr,
168 bootstrap_peers: Vec::new(),
169 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
170 enable_ipv6: config.network.ipv6_enabled,
171
172 connection_timeout: Duration::from_secs(config.network.connection_timeout),
173 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
174 max_connections: config.network.max_connections,
175 max_incoming_connections: config.security.connection_limit as usize,
176 dht_config: DHTConfig::default(),
177 security_config: SecurityConfig::default(),
178 production_config: None,
179 bootstrap_cache_config: None,
180 })
182 }
183}
184
185impl Default for NodeConfig {
186 fn default() -> Self {
187 let config = Config::default();
189
190 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
192 std::net::SocketAddr::new(
193 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
194 9000,
195 )
196 });
197
198 Self {
199 peer_id: None,
200 listen_addrs: vec![
201 std::net::SocketAddr::new(
202 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
203 listen_addr.port(),
204 ),
205 std::net::SocketAddr::new(
206 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
207 listen_addr.port(),
208 ),
209 ],
210 listen_addr,
211 bootstrap_peers: Vec::new(),
212 bootstrap_peers_str: Vec::new(),
213 enable_ipv6: config.network.ipv6_enabled,
214
215 connection_timeout: Duration::from_secs(config.network.connection_timeout),
216 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
217 max_connections: config.network.max_connections,
218 max_incoming_connections: config.security.connection_limit as usize,
219 dht_config: DHTConfig::default(),
220 security_config: SecurityConfig::default(),
221 production_config: None, bootstrap_cache_config: None,
223 }
225 }
226}
227
228impl NodeConfig {
229 pub fn from_config(config: &Config) -> Result<Self> {
231 let listen_addr = config.listen_socket_addr()?;
232 let bootstrap_addrs = config.bootstrap_addrs()?;
233
234 let mut node_config = Self {
235 peer_id: None,
236 listen_addrs: vec![listen_addr],
237 listen_addr,
238 bootstrap_peers: bootstrap_addrs
239 .iter()
240 .map(|addr| addr.socket_addr())
241 .collect(),
242 bootstrap_peers_str: config
243 .network
244 .bootstrap_nodes
245 .iter()
246 .map(|addr| addr.to_string())
247 .collect(),
248 enable_ipv6: config.network.ipv6_enabled,
249
250 connection_timeout: Duration::from_secs(config.network.connection_timeout),
251 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
252 max_connections: config.network.max_connections,
253 max_incoming_connections: config.security.connection_limit as usize,
254 dht_config: DHTConfig {
255 k_value: 20,
256 alpha_value: 3,
257 record_ttl: Duration::from_secs(3600),
258 refresh_interval: Duration::from_secs(900),
259 },
260 security_config: SecurityConfig {
261 enable_noise: true,
262 enable_tls: true,
263 trust_level: TrustLevel::Basic,
264 },
265 production_config: Some(ProductionConfig {
266 max_connections: config.network.max_connections,
267 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
270 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
271 health_check_interval: Duration::from_secs(30),
272 metrics_interval: Duration::from_secs(60),
273 enable_performance_tracking: true,
274 enable_auto_cleanup: true,
275 shutdown_timeout: Duration::from_secs(30),
276 rate_limits: crate::production::RateLimitConfig::default(),
277 }),
278 bootstrap_cache_config: None,
279 };
284
285 if config.network.ipv6_enabled {
287 node_config.listen_addrs.push(std::net::SocketAddr::new(
288 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
289 listen_addr.port(),
290 ));
291 }
292
293 Ok(node_config)
294 }
295
296 pub fn with_listen_addr(addr: &str) -> Result<Self> {
298 let listen_addr: std::net::SocketAddr = addr
299 .parse()
300 .map_err(|e: std::net::AddrParseError| {
301 NetworkError::InvalidAddress(e.to_string().into())
302 })
303 .map_err(P2PError::Network)?;
304 let cfg = NodeConfig {
305 listen_addr,
306 listen_addrs: vec![listen_addr],
307 ..Default::default()
308 };
309 Ok(cfg)
310 }
311}
312
313impl Default for DHTConfig {
314 fn default() -> Self {
315 Self {
316 k_value: 20,
317 alpha_value: 5,
318 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
321 }
322}
323
324impl Default for SecurityConfig {
325 fn default() -> Self {
326 Self {
327 enable_noise: true,
328 enable_tls: true,
329 trust_level: TrustLevel::Basic,
330 }
331 }
332}
333
334#[derive(Debug, Clone)]
336pub struct PeerInfo {
337 pub peer_id: PeerId,
339
340 pub addresses: Vec<String>,
342
343 pub connected_at: Instant,
345
346 pub last_seen: Instant,
348
349 pub status: ConnectionStatus,
351
352 pub protocols: Vec<String>,
354
355 pub heartbeat_count: u64,
357}
358
359#[derive(Debug, Clone, PartialEq)]
361pub enum ConnectionStatus {
362 Connecting,
364 Connected,
366 Disconnecting,
368 Disconnected,
370 Failed(String),
372}
373
374#[derive(Debug, Clone)]
376pub enum NetworkEvent {
377 PeerConnected {
379 peer_id: PeerId,
381 addresses: Vec<String>,
383 },
384
385 PeerDisconnected {
387 peer_id: PeerId,
389 reason: String,
391 },
392
393 MessageReceived {
395 peer_id: PeerId,
397 protocol: String,
399 data: Vec<u8>,
401 },
402
403 ConnectionFailed {
405 peer_id: Option<PeerId>,
407 address: String,
409 error: String,
411 },
412
413 DHTRecordStored {
415 key: Vec<u8>,
417 value: Vec<u8>,
419 },
420
421 DHTRecordRetrieved {
423 key: Vec<u8>,
425 value: Option<Vec<u8>>,
427 },
428}
429
430#[derive(Debug, Clone)]
435pub enum P2PEvent {
436 Message {
438 topic: String,
440 source: PeerId,
442 data: Vec<u8>,
444 },
445 PeerConnected(PeerId),
447 PeerDisconnected(PeerId),
449}
450
451pub struct P2PNode {
461 config: NodeConfig,
463
464 peer_id: PeerId,
466
467 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
469
470 event_tx: broadcast::Sender<P2PEvent>,
472
473 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
475
476 start_time: Instant,
478
479 running: RwLock<bool>,
481
482 dht: Option<Arc<RwLock<DHT>>>,
484
485 resource_manager: Option<Arc<ResourceManager>>,
487
488 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
490
491 dual_node: Arc<DualStackNetworkNode>,
493
494 #[allow(dead_code)]
496 rate_limiter: Arc<RateLimiter>,
497
498 active_connections: Arc<RwLock<HashSet<PeerId>>>,
501
502 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
504
505 #[allow(dead_code)]
507 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
508
509 #[allow(dead_code)]
511 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
512
513 #[allow(dead_code)]
515 shutdown: Arc<AtomicBool>,
516
517 #[allow(dead_code)]
519 geo_provider: Arc<BgpGeoProvider>,
520}
521
522fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
538 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
539
540 if addr.ip().is_unspecified() {
541 let loopback_ip = match addr {
543 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
546 std::net::SocketAddr::new(loopback_ip, addr.port())
547 } else {
548 addr
550 }
551}
552
553impl P2PNode {
554 pub fn new_for_tests() -> Result<Self> {
556 let (event_tx, _) = broadcast::channel(16);
557 Ok(Self {
558 config: NodeConfig::default(),
559 peer_id: "test_peer".to_string(),
560 peers: Arc::new(RwLock::new(HashMap::new())),
561 event_tx,
562 listen_addrs: RwLock::new(Vec::new()),
563 start_time: Instant::now(),
564 running: RwLock::new(false),
565 dht: None,
566 resource_manager: None,
567 bootstrap_manager: None,
568 dual_node: {
569 let v6: Option<std::net::SocketAddr> = "[::1]:0"
571 .parse()
572 .ok()
573 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
574 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
575 let handle = tokio::runtime::Handle::current();
576 let dual_attempt = handle.block_on(
577 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
578 );
579 let dual = match dual_attempt {
580 Ok(d) => d,
581 Err(_e1) => {
582 let fallback = handle.block_on(
584 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
585 None,
586 "127.0.0.1:0".parse().ok(),
587 ),
588 );
589 match fallback {
590 Ok(d) => d,
591 Err(e2) => {
592 return Err(P2PError::Network(NetworkError::BindError(
593 format!("Failed to create dual-stack network node: {}", e2)
594 .into(),
595 )));
596 }
597 }
598 }
599 };
600 Arc::new(dual)
601 },
602 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
603 max_requests: 100,
604 burst_size: 100,
605 window: std::time::Duration::from_secs(1),
606 ..Default::default()
607 })),
608 active_connections: Arc::new(RwLock::new(HashSet::new())),
609 connection_monitor_handle: Arc::new(RwLock::new(None)),
610 keepalive_handle: Arc::new(RwLock::new(None)),
611 shutdown: Arc::new(AtomicBool::new(false)),
612 geo_provider: Arc::new(BgpGeoProvider::new()),
613 security_dashboard: None,
614 })
615 }
616 pub async fn new(config: NodeConfig) -> Result<Self> {
618 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
619 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
621 });
622
623 let (event_tx, _) = broadcast::channel(1000);
624
625 {
628 use blake3::Hasher;
629 let mut hasher = Hasher::new();
630 hasher.update(peer_id.as_bytes());
631 let digest = hasher.finalize();
632 let mut nid = [0u8; 32];
633 nid.copy_from_slice(digest.as_bytes());
634 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
635 crate::identity::node_identity::NodeId::from_bytes(nid),
636 ));
637 }
640
641 let (dht, security_dashboard) = if true { let _dht_config = crate::dht::DHTConfig {
644 replication_factor: config.dht_config.k_value,
645 bucket_size: config.dht_config.k_value,
646 alpha: config.dht_config.alpha_value,
647 record_ttl: config.dht_config.record_ttl,
648 bucket_refresh_interval: config.dht_config.refresh_interval,
649 republish_interval: config.dht_config.refresh_interval,
650 max_distance: 160,
651 };
652 let peer_bytes = peer_id.as_bytes();
654 let mut node_id_bytes = [0u8; 32];
655 let len = peer_bytes.len().min(32);
656 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
657 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
658 let dht_instance = DHT::new(node_id).map_err(|e| {
659 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
660 e.to_string().into(),
661 ))
662 })?;
663 dht_instance.start_maintenance_tasks();
664
665 let security_metrics = dht_instance.security_metrics();
667 let dashboard = crate::dht::metrics::SecurityDashboard::new(
668 security_metrics,
669 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
670 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
671 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
672 );
673
674 (Some(Arc::new(RwLock::new(dht_instance))), Some(Arc::new(dashboard)))
675 } else {
676 (None, None)
677 };
678
679 let resource_manager = config
683 .production_config
684 .clone()
685 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
686
687 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
689 match BootstrapManager::with_config(cache_config.clone()).await {
690 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
691 Err(e) => {
692 warn!(
693 "Failed to initialize bootstrap manager: {}, continuing without cache",
694 e
695 );
696 None
697 }
698 }
699 } else {
700 match BootstrapManager::new().await {
701 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
702 Err(e) => {
703 warn!(
704 "Failed to initialize bootstrap manager: {}, continuing without cache",
705 e
706 );
707 None
708 }
709 }
710 };
711
712 let (v6_opt, v4_opt) = {
715 let port = config.listen_addr.port();
716 let ip = config.listen_addr.ip();
717
718 let v4_addr = if ip.is_ipv4() {
719 Some(std::net::SocketAddr::new(ip, port))
720 } else {
721 Some(std::net::SocketAddr::new(
724 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
725 port,
726 ))
727 };
728
729 let v6_addr = if config.enable_ipv6 {
730 if ip.is_ipv6() {
731 Some(std::net::SocketAddr::new(ip, port))
732 } else {
733 Some(std::net::SocketAddr::new(
734 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
735 port,
736 ))
737 }
738 } else {
739 None
740 };
741 (v6_addr, v4_addr)
742 };
743
744 let dual_node = Arc::new(
745 DualStackNetworkNode::new(v6_opt, v4_opt)
746 .await
747 .map_err(|e| {
748 P2PError::Transport(crate::error::TransportError::SetupFailed(
749 format!("Failed to create dual-stack network nodes: {}", e).into(),
750 ))
751 })?,
752 );
753
754 let rate_limiter = Arc::new(RateLimiter::new(
756 crate::validation::RateLimitConfig::default(),
757 ));
758
759 let active_connections = Arc::new(RwLock::new(HashSet::new()));
761
762 let geo_provider = Arc::new(BgpGeoProvider::new());
764
765 let peers = Arc::new(RwLock::new(HashMap::new()));
767
768 let connection_monitor_handle = {
770 let active_conns = Arc::clone(&active_connections);
771 let peers_map = Arc::clone(&peers);
772 let event_tx_clone = event_tx.clone();
773 let dual_node_clone = Arc::clone(&dual_node);
774 let geo_provider_clone = Arc::clone(&geo_provider);
775 let peer_id_clone = peer_id.clone();
776
777 let handle = tokio::spawn(async move {
778 Self::connection_lifecycle_monitor(
779 dual_node_clone,
780 active_conns,
781 peers_map,
782 event_tx_clone,
783 geo_provider_clone,
784 peer_id_clone,
785 )
786 .await;
787 });
788
789 Arc::new(RwLock::new(Some(handle)))
790 };
791
792 let shutdown = Arc::new(AtomicBool::new(false));
794 let keepalive_handle = {
795 let active_conns = Arc::clone(&active_connections);
796 let dual_node_clone = Arc::clone(&dual_node);
797 let shutdown_clone = Arc::clone(&shutdown);
798
799 let handle = tokio::spawn(async move {
800 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
801 });
802
803 Arc::new(RwLock::new(Some(handle)))
804 };
805
806 let node = Self {
807 config,
808 peer_id,
809 peers,
810 event_tx,
811 listen_addrs: RwLock::new(Vec::new()),
812 start_time: Instant::now(),
813 running: RwLock::new(false),
814 dht,
815 resource_manager,
816 bootstrap_manager,
817 dual_node,
818 rate_limiter,
819 active_connections,
820 security_dashboard,
821 connection_monitor_handle,
822 keepalive_handle,
823 shutdown,
824 geo_provider,
825 };
826 info!("Created P2P node with peer ID: {}", node.peer_id);
827
828 node.start_network_listeners().await?;
830
831 node.start_connection_monitor().await;
833
834 Ok(node)
835 }
836
837 pub fn builder() -> NodeBuilder {
839 NodeBuilder::new()
840 }
841
842 pub fn peer_id(&self) -> &PeerId {
844 &self.peer_id
845 }
846
847 pub fn local_addr(&self) -> Option<String> {
848 self.listen_addrs
849 .try_read()
850 .ok()
851 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
852 }
853
854 pub async fn subscribe(&self, topic: &str) -> Result<()> {
855 info!("Subscribed to topic: {}", topic);
858 Ok(())
859 }
860
861 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
862 info!(
863 "Publishing message to topic: {} ({} bytes)",
864 topic,
865 data.len()
866 );
867
868 let peer_list: Vec<PeerId> = {
870 let peers_guard = self.peers.read().await;
871 peers_guard.keys().cloned().collect()
872 };
873
874 if peer_list.is_empty() {
875 debug!("No peers connected, message will only be sent to local subscribers");
876 } else {
877 let mut send_count = 0;
879 for peer_id in &peer_list {
880 match self.send_message(peer_id, topic, data.to_vec()).await {
881 Ok(_) => {
882 send_count += 1;
883 debug!("Sent message to peer: {}", peer_id);
884 }
885 Err(e) => {
886 warn!("Failed to send message to peer {}: {}", peer_id, e);
887 }
888 }
889 }
890 info!(
891 "Published message to {}/{} connected peers",
892 send_count,
893 peer_list.len()
894 );
895 }
896
897 let event = P2PEvent::Message {
899 topic: topic.to_string(),
900 source: self.peer_id.clone(),
901 data: data.to_vec(),
902 };
903 let _ = self.event_tx.send(event);
904
905 Ok(())
906 }
907
908 pub fn config(&self) -> &NodeConfig {
910 &self.config
911 }
912
913 pub async fn start(&self) -> Result<()> {
915 info!("Starting P2P node...");
916
917 if let Some(ref resource_manager) = self.resource_manager {
919 resource_manager.start().await.map_err(|e| {
920 P2PError::Network(crate::error::NetworkError::ProtocolError(
921 format!("Failed to start resource manager: {e}").into(),
922 ))
923 })?;
924 info!("Production resource manager started");
925 }
926
927 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
929 let mut manager = bootstrap_manager.write().await;
930 manager.start_background_tasks().await.map_err(|e| {
931 P2PError::Network(crate::error::NetworkError::ProtocolError(
932 format!("Failed to start bootstrap manager: {e}").into(),
933 ))
934 })?;
935 info!("Bootstrap cache manager started");
936 }
937
938 *self.running.write().await = true;
940
941 self.start_network_listeners().await?;
943
944 let listen_addrs = self.listen_addrs.read().await;
946 info!("P2P node started on addresses: {:?}", *listen_addrs);
947
948 self.start_message_receiving_system().await?;
952
953 self.connect_bootstrap_peers().await?;
955
956 Ok(())
957 }
958
959 async fn start_network_listeners(&self) -> Result<()> {
961 info!("Starting dual-stack listeners (ant-quic)...");
962 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
964 P2PError::Transport(crate::error::TransportError::SetupFailed(
965 format!("Failed to get local addresses: {}", e).into(),
966 ))
967 })?;
968 {
969 let mut la = self.listen_addrs.write().await;
970 *la = addrs.clone();
971 }
972
973 let event_tx = self.event_tx.clone();
975 let peers = self.peers.clone();
976 let active_connections = self.active_connections.clone();
977 let rate_limiter = self.rate_limiter.clone();
978 let dual = self.dual_node.clone();
979 tokio::spawn(async move {
980 loop {
981 match dual.accept_any().await {
982 Ok((ant_peer_id, remote_sock)) => {
983 let peer_id =
984 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
985 let remote_addr = NetworkAddress::from(remote_sock);
986 let _ = rate_limiter.check_ip(&remote_sock.ip());
988 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
989 register_new_peer(&peers, &peer_id, &remote_addr).await;
990 active_connections.write().await.insert(peer_id);
991 }
992 Err(e) => {
993 warn!("Accept failed: {}", e);
994 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
995 }
996 }
997 }
998 });
999
1000 info!("Dual-stack listeners active on: {:?}", addrs);
1001 Ok(())
1002 }
1003
1004 #[allow(dead_code)]
1006 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1007 warn!("QUIC transport temporarily disabled during ant-quic migration");
1046 Err(crate::P2PError::Transport(
1048 crate::error::TransportError::SetupFailed(
1049 format!(
1050 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1051 )
1052 .into(),
1053 ),
1054 ))
1055 }
1056
1057 #[allow(dead_code)] async fn start_connection_acceptor(
1060 &self,
1061 transport: Arc<dyn crate::transport::Transport>,
1062 addr: std::net::SocketAddr,
1063 transport_type: crate::transport::TransportType,
1064 ) -> Result<()> {
1065 info!(
1066 "Starting connection acceptor for {:?} on {}",
1067 transport_type, addr
1068 );
1069
1070 let event_tx = self.event_tx.clone();
1072 let _peer_id = self.peer_id.clone();
1073 let peers = Arc::clone(&self.peers);
1074 let rate_limiter = Arc::clone(&self.rate_limiter);
1077
1078 tokio::spawn(async move {
1080 loop {
1081 match transport.accept().await {
1082 Ok(connection) => {
1083 let remote_addr = connection.remote_addr();
1084 let connection_peer_id =
1085 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1086
1087 let socket_addr = remote_addr.socket_addr();
1089 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1090 continue;
1092 }
1093
1094 info!(
1095 "Accepted {:?} connection from {} (peer: {})",
1096 transport_type, remote_addr, connection_peer_id
1097 );
1098
1099 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1101
1102 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1104
1105 spawn_connection_handler(
1107 connection,
1108 connection_peer_id,
1109 event_tx.clone(),
1110 Arc::clone(&peers),
1111 );
1112 }
1113 Err(e) => {
1114 warn!(
1115 "Failed to accept {:?} connection on {}: {}",
1116 transport_type, addr, e
1117 );
1118
1119 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1121 }
1122 }
1123 }
1124 });
1125
1126 info!(
1127 "Connection acceptor background task started for {:?} on {}",
1128 transport_type, addr
1129 );
1130 Ok(())
1131 }
1132
1133 async fn start_message_receiving_system(&self) -> Result<()> {
1135 info!("Starting message receiving system");
1136 let dual = self.dual_node.clone();
1137 let event_tx = self.event_tx.clone();
1138
1139 tokio::spawn(async move {
1140 loop {
1141 match dual.receive_any().await {
1142 Ok((_peer_id, bytes)) => {
1143 #[allow(clippy::collapsible_if)]
1145 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1146 if let (Some(protocol), Some(data), Some(from)) = (
1147 value.get("protocol").and_then(|v| v.as_str()),
1148 value.get("data").and_then(|v| v.as_array()),
1149 value.get("from").and_then(|v| v.as_str()),
1150 ) {
1151 let payload: Vec<u8> = data
1152 .iter()
1153 .filter_map(|v| v.as_u64().map(|n| n as u8))
1154 .collect();
1155 let _ = event_tx.send(P2PEvent::Message {
1156 topic: protocol.to_string(),
1157 source: from.to_string(),
1158 data: payload,
1159 });
1160 }
1161 }
1162 }
1163 Err(e) => {
1164 warn!("Receive error: {}", e);
1165 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1166 }
1167 }
1168 }
1169 });
1170
1171 Ok(())
1172 }
1173
1174 #[allow(dead_code)]
1176 async fn handle_received_message(
1177 &self,
1178 message_data: Vec<u8>,
1179 peer_id: &PeerId,
1180 _protocol: &str,
1181 event_tx: &broadcast::Sender<P2PEvent>,
1182 ) -> Result<()> {
1183 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1187 Ok(message) => {
1188 if let (Some(protocol), Some(data), Some(from)) = (
1189 message.get("protocol").and_then(|v| v.as_str()),
1190 message.get("data").and_then(|v| v.as_array()),
1191 message.get("from").and_then(|v| v.as_str()),
1192 ) {
1193 let data_bytes: Vec<u8> = data
1195 .iter()
1196 .filter_map(|v| v.as_u64().map(|n| n as u8))
1197 .collect();
1198
1199 let event = P2PEvent::Message {
1201 topic: protocol.to_string(),
1202 source: from.to_string(),
1203 data: data_bytes,
1204 };
1205
1206 let _ = event_tx.send(event);
1207 debug!("Generated message event from peer: {}", peer_id);
1208 }
1209 }
1210 Err(e) => {
1211 warn!("Failed to parse received message from {}: {}", peer_id, e);
1212 }
1213 }
1214
1215 Ok(())
1216 }
1217
1218 pub async fn run(&self) -> Result<()> {
1224 if !*self.running.read().await {
1225 self.start().await?;
1226 }
1227
1228 info!("P2P node running...");
1229
1230 loop {
1232 if !*self.running.read().await {
1233 break;
1234 }
1235
1236 self.periodic_tasks().await?;
1238
1239 tokio::time::sleep(Duration::from_millis(100)).await;
1241 }
1242
1243 info!("P2P node stopped");
1244 Ok(())
1245 }
1246
1247 pub async fn stop(&self) -> Result<()> {
1249 info!("Stopping P2P node...");
1250
1251 *self.running.write().await = false;
1253
1254 self.disconnect_all_peers().await?;
1256
1257 if let Some(ref resource_manager) = self.resource_manager {
1259 resource_manager.shutdown().await.map_err(|e| {
1260 P2PError::Network(crate::error::NetworkError::ProtocolError(
1261 format!("Failed to shutdown resource manager: {e}").into(),
1262 ))
1263 })?;
1264 info!("Production resource manager stopped");
1265 }
1266
1267 info!("P2P node stopped");
1268 Ok(())
1269 }
1270
1271 pub async fn shutdown(&self) -> Result<()> {
1273 self.stop().await
1274 }
1275
1276 pub async fn is_running(&self) -> bool {
1278 *self.running.read().await
1279 }
1280
1281 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1283 self.listen_addrs.read().await.clone()
1284 }
1285
1286 pub async fn connected_peers(&self) -> Vec<PeerId> {
1288 self.peers.read().await.keys().cloned().collect()
1289 }
1290
1291 pub async fn peer_count(&self) -> usize {
1293 self.peers.read().await.len()
1294 }
1295
1296 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1298 self.peers.read().await.get(peer_id).cloned()
1299 }
1300
1301 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1313 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1315
1316 let peers = self.peers.read().await;
1317
1318 for (peer_id, peer_info) in peers.iter() {
1320 for peer_addr in &peer_info.addresses {
1322 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1323 && peer_socket == socket_addr
1324 {
1325 return Some(peer_id.clone());
1326 }
1327 }
1328 }
1329
1330 None
1331 }
1332
1333 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1339 let peers = self.peers.read().await;
1340
1341 peers
1342 .iter()
1343 .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1344 .collect()
1345 }
1346
1347 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1359 self.active_connections.write().await.remove(peer_id);
1361 self.peers.write().await.remove(peer_id).is_some()
1363 }
1364
1365 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1378 self.peers.read().await.contains_key(peer_id)
1379 }
1380
1381 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1383 info!("Connecting to peer at: {}", address);
1384
1385 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1387 Some(resource_manager.acquire_connection().await?)
1388 } else {
1389 None
1390 };
1391
1392 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1394 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1395 format!("{}: {}", address, e).into(),
1396 ))
1397 })?;
1398
1399 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1402 if normalized_addr != socket_addr {
1403 info!(
1404 "Normalized wildcard address {} to loopback {}",
1405 socket_addr, normalized_addr
1406 );
1407 }
1408
1409 let addr_list = vec![normalized_addr];
1411 let peer_id = match tokio::time::timeout(
1412 self.config.connection_timeout,
1413 self.dual_node.connect_happy_eyeballs(&addr_list),
1414 )
1415 .await
1416 {
1417 Ok(Ok(peer)) => {
1418 let connected_peer_id =
1419 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1420 info!("Successfully connected to peer: {}", connected_peer_id);
1421 connected_peer_id
1422 }
1423 Ok(Err(e)) => {
1424 warn!("Failed to connect to peer at {}: {}", address, e);
1425 let sanitized_address = address.replace(['/', ':'], "_");
1426 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1427 warn!(
1428 "Using demo peer ID: {} (transport connection failed)",
1429 demo_peer_id
1430 );
1431 demo_peer_id
1432 }
1433 Err(_) => {
1434 warn!(
1435 "Timed out connecting to peer at {} after {:?}",
1436 address, self.config.connection_timeout
1437 );
1438 let sanitized_address = address.replace(['/', ':'], "_");
1439 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1440 demo_peer_id
1441 }
1442 };
1443
1444 let peer_info = PeerInfo {
1446 peer_id: peer_id.clone(),
1447 addresses: vec![address.to_string()],
1448 connected_at: Instant::now(),
1449 last_seen: Instant::now(),
1450 status: ConnectionStatus::Connected,
1451 protocols: vec!["p2p-foundation/1.0".to_string()],
1452 heartbeat_count: 0,
1453 };
1454
1455 self.peers.write().await.insert(peer_id.clone(), peer_info);
1457
1458 self.active_connections
1461 .write()
1462 .await
1463 .insert(peer_id.clone());
1464
1465 if let Some(ref resource_manager) = self.resource_manager {
1467 resource_manager.record_bandwidth(0, 0); }
1469
1470 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1472
1473 info!("Connected to peer: {}", peer_id);
1474 Ok(peer_id)
1475 }
1476
1477 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1479 info!("Disconnecting from peer: {}", peer_id);
1480
1481 self.active_connections.write().await.remove(peer_id);
1483
1484 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1485 peer_info.status = ConnectionStatus::Disconnected;
1486
1487 let _ = self
1489 .event_tx
1490 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1491
1492 info!("Disconnected from peer: {}", peer_id);
1493 }
1494
1495 Ok(())
1496 }
1497
1498 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1500 self.active_connections.read().await.contains(peer_id)
1501 }
1502
1503 pub async fn send_message(
1505 &self,
1506 peer_id: &PeerId,
1507 protocol: &str,
1508 data: Vec<u8>,
1509 ) -> Result<()> {
1510 debug!(
1511 "Sending message to peer {} on protocol {}",
1512 peer_id, protocol
1513 );
1514
1515 if let Some(ref resource_manager) = self.resource_manager
1517 && !resource_manager
1518 .check_rate_limit(peer_id, "message")
1519 .await?
1520 {
1521 return Err(P2PError::ResourceExhausted(
1522 format!("Rate limit exceeded for peer {}", peer_id).into(),
1523 ));
1524 }
1525
1526 if !self.peers.read().await.contains_key(peer_id) {
1528 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1529 peer_id.to_string().into(),
1530 )));
1531 }
1532
1533 if !self.is_connection_active(peer_id).await {
1536 debug!(
1537 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1538 peer_id
1539 );
1540
1541 self.remove_peer(peer_id).await;
1543
1544 return Err(P2PError::Network(
1545 crate::error::NetworkError::ConnectionClosed {
1546 peer_id: peer_id.to_string().into(),
1547 },
1548 ));
1549 }
1550
1551 if let Some(ref resource_manager) = self.resource_manager {
1555 resource_manager.record_bandwidth(data.len() as u64, 0);
1556 }
1557
1558 let _message_data = self.create_protocol_message(protocol, data)?;
1560
1561 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1563 tokio::time::timeout(self.config.connection_timeout, send_fut)
1564 .await
1565 .map_err(|_| {
1566 P2PError::Transport(crate::error::TransportError::StreamError(
1567 "Timed out sending message".into(),
1568 ))
1569 })?
1570 .map_err(|e| {
1571 P2PError::Transport(crate::error::TransportError::StreamError(
1572 e.to_string().into(),
1573 ))
1574 })
1575 }
1576
1577 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1579 use serde_json::json;
1580
1581 let timestamp = std::time::SystemTime::now()
1582 .duration_since(std::time::UNIX_EPOCH)
1583 .map_err(|e| {
1584 P2PError::Network(NetworkError::ProtocolError(
1585 format!("System time error: {}", e).into(),
1586 ))
1587 })?
1588 .as_secs();
1589
1590 let message = json!({
1592 "protocol": protocol,
1593 "data": data,
1594 "from": self.peer_id,
1595 "timestamp": timestamp
1596 });
1597
1598 serde_json::to_vec(&message).map_err(|e| {
1599 P2PError::Transport(crate::error::TransportError::StreamError(
1600 format!("Failed to serialize message: {e}").into(),
1601 ))
1602 })
1603 }
1604
1605 }
1607
1608#[allow(dead_code)]
1610fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1611 use serde_json::json;
1612
1613 let timestamp = std::time::SystemTime::now()
1614 .duration_since(std::time::UNIX_EPOCH)
1615 .map_err(|e| {
1616 P2PError::Network(NetworkError::ProtocolError(
1617 format!("System time error: {}", e).into(),
1618 ))
1619 })?
1620 .as_secs();
1621
1622 let message = json!({
1624 "protocol": protocol,
1625 "data": data,
1626 "timestamp": timestamp
1627 });
1628
1629 serde_json::to_vec(&message).map_err(|e| {
1630 P2PError::Transport(crate::error::TransportError::StreamError(
1631 format!("Failed to serialize message: {e}").into(),
1632 ))
1633 })
1634}
1635
1636impl P2PNode {
1637 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1639 self.event_tx.subscribe()
1640 }
1641
1642 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1644 self.subscribe_events()
1645 }
1646
1647 pub fn uptime(&self) -> Duration {
1649 self.start_time.elapsed()
1650 }
1651
1652 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1662 if let Some(ref resource_manager) = self.resource_manager {
1663 Ok(resource_manager.get_metrics().await)
1664 } else {
1665 Err(P2PError::Network(
1666 crate::error::NetworkError::ProtocolError(
1667 "Production resource manager not enabled".to_string().into(),
1668 ),
1669 ))
1670 }
1671 }
1672
1673 async fn connection_lifecycle_monitor(
1676 dual_node: Arc<DualStackNetworkNode>,
1677 active_connections: Arc<RwLock<HashSet<String>>>,
1678 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1679 event_tx: broadcast::Sender<P2PEvent>,
1680 geo_provider: Arc<BgpGeoProvider>,
1681 local_peer_id: String,
1682 ) {
1683 use crate::transport::ant_quic_adapter::ConnectionEvent;
1684
1685 let mut event_rx = dual_node.subscribe_connection_events();
1686
1687 info!("Connection lifecycle monitor started");
1688
1689 loop {
1690 match event_rx.recv().await {
1691 Ok(event) => {
1692 match event {
1693 ConnectionEvent::Established {
1694 peer_id,
1695 remote_address,
1696 } => {
1697 let peer_id_str =
1698 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1699 debug!(
1700 "Connection established: peer={}, addr={}",
1701 peer_id_str, remote_address
1702 );
1703
1704 let ip = remote_address.ip();
1707 let is_rejected = match ip {
1708 std::net::IpAddr::V4(v4) => {
1709 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1711 geo_provider.is_hosting_asn(asn)
1712 || geo_provider.is_vpn_asn(asn)
1713 } else {
1714 false
1715 }
1716 }
1717 std::net::IpAddr::V6(v6) => {
1718 let info = geo_provider.lookup(v6);
1719 info.is_hosting_provider || info.is_vpn_provider
1720 }
1721 };
1722
1723 if is_rejected {
1724 info!(
1725 "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1726 peer_id_str, remote_address
1727 );
1728
1729 let rejection = RejectionMessage {
1731 reason: RejectionReason::GeoIpPolicy,
1732 message:
1733 "Connection rejected: Hosting/VPN providers not allowed"
1734 .to_string(),
1735 suggested_target: None, };
1737
1738 if let Ok(data) = serde_json::to_vec(&rejection) {
1740 let timestamp = std::time::SystemTime::now()
1742 .duration_since(std::time::UNIX_EPOCH)
1743 .unwrap_or_default()
1744 .as_secs();
1745
1746 let message = serde_json::json!({
1747 "protocol": "control",
1748 "data": data,
1749 "from": local_peer_id,
1750 "timestamp": timestamp
1751 });
1752
1753 if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1754 let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1758
1759 tokio::task::yield_now().await;
1762 }
1763 }
1764
1765 continue;
1769 }
1770
1771 active_connections.write().await.insert(peer_id_str.clone());
1773
1774 let mut peers_lock = peers.write().await;
1776 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1777 peer_info.status = ConnectionStatus::Connected;
1778 peer_info.connected_at = Instant::now();
1779 } else {
1780 debug!("Registering new incoming peer: {}", peer_id_str);
1782 peers_lock.insert(
1783 peer_id_str.clone(),
1784 PeerInfo {
1785 peer_id: peer_id_str.clone(),
1786 addresses: vec![remote_address.to_string()],
1787 status: ConnectionStatus::Connected,
1788 last_seen: Instant::now(),
1789 connected_at: Instant::now(),
1790 protocols: Vec::new(),
1791 heartbeat_count: 0,
1792 },
1793 );
1794 }
1795
1796 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1798 }
1799 ConnectionEvent::Lost { peer_id, reason } => {
1800 let peer_id_str =
1801 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1802 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1803
1804 active_connections.write().await.remove(&peer_id_str);
1806
1807 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1809 peer_info.status = ConnectionStatus::Disconnected;
1810 peer_info.last_seen = Instant::now();
1811 }
1812
1813 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1815 }
1816 ConnectionEvent::Failed { peer_id, reason } => {
1817 let peer_id_str =
1818 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1819 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1820
1821 active_connections.write().await.remove(&peer_id_str);
1823
1824 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1826 peer_info.status = ConnectionStatus::Failed(reason.clone());
1827 }
1828
1829 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1831 }
1832 }
1833 }
1834 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1835 warn!(
1836 "Connection event monitor lagged, skipped {} events",
1837 skipped
1838 );
1839 continue;
1840 }
1841 Err(broadcast::error::RecvError::Closed) => {
1842 info!("Connection event channel closed, stopping monitor");
1843 break;
1844 }
1845 }
1846 }
1847
1848 info!("Connection lifecycle monitor stopped");
1849 }
1850
1851 async fn start_connection_monitor(&self) {
1853 debug!("Connection monitor already running from initialization");
1857 }
1858
1859 async fn keepalive_task(
1865 active_connections: Arc<RwLock<HashSet<String>>>,
1866 dual_node: Arc<DualStackNetworkNode>,
1867 shutdown: Arc<AtomicBool>,
1868 ) {
1869 use tokio::time::{Duration, interval};
1870
1871 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1875 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1876
1877 info!(
1878 "Keepalive task started (interval: {}s)",
1879 KEEPALIVE_INTERVAL_SECS
1880 );
1881
1882 loop {
1883 if shutdown.load(Ordering::Relaxed) {
1885 info!("Keepalive task shutting down");
1886 break;
1887 }
1888
1889 interval.tick().await;
1890
1891 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1893
1894 if peers.is_empty() {
1895 trace!("Keepalive: no active connections");
1896 continue;
1897 }
1898
1899 debug!("Sending keepalive to {} active connections", peers.len());
1900
1901 for peer_id in peers {
1903 match dual_node
1904 .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1905 .await
1906 {
1907 Ok(_) => {
1908 trace!("Keepalive sent to peer: {}", peer_id);
1909 }
1910 Err(e) => {
1911 debug!(
1912 "Failed to send keepalive to peer {}: {} (connection may have closed)",
1913 peer_id, e
1914 );
1915 }
1917 }
1918 }
1919 }
1920
1921 info!("Keepalive task stopped");
1922 }
1923
1924 pub async fn health_check(&self) -> Result<()> {
1926 if let Some(ref resource_manager) = self.resource_manager {
1927 resource_manager.health_check().await
1928 } else {
1929 let peer_count = self.peer_count().await;
1931 if peer_count > self.config.max_connections {
1932 Err(P2PError::Network(
1933 crate::error::NetworkError::ProtocolError(
1934 format!("Too many connections: {peer_count}").into(),
1935 ),
1936 ))
1937 } else {
1938 Ok(())
1939 }
1940 }
1941 }
1942
1943 pub fn production_config(&self) -> Option<&ProductionConfig> {
1945 self.config.production_config.as_ref()
1946 }
1947
1948 pub fn is_production_mode(&self) -> bool {
1950 self.resource_manager.is_some()
1951 }
1952
1953 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1955 self.dht.as_ref()
1956 }
1957
1958 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1960 if let Some(ref dht) = self.dht {
1961 let mut dht_instance = dht.write().await;
1962 let dht_key = crate::dht::DhtKey::from_bytes(key);
1963 dht_instance
1964 .store(&dht_key, value.clone())
1965 .await
1966 .map_err(|e| {
1967 P2PError::Dht(crate::error::DhtError::StoreFailed(
1968 format!("{:?}: {e}", key).into(),
1969 ))
1970 })?;
1971
1972 Ok(())
1973 } else {
1974 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1975 "DHT not enabled".to_string().into(),
1976 )))
1977 }
1978 }
1979
1980 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1982 if let Some(ref dht) = self.dht {
1983 let dht_instance = dht.read().await;
1984 let dht_key = crate::dht::DhtKey::from_bytes(key);
1985 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1986 P2PError::Dht(crate::error::DhtError::StoreFailed(
1987 format!("Retrieve failed: {e}").into(),
1988 ))
1989 })?;
1990
1991 Ok(record_result)
1992 } else {
1993 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1994 "DHT not enabled".to_string().into(),
1995 )))
1996 }
1997 }
1998
1999 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2001 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2002 let mut manager = bootstrap_manager.write().await;
2003 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2004 .iter()
2005 .filter_map(|addr| addr.parse().ok())
2006 .collect();
2007 let contact = ContactEntry::new(peer_id, socket_addresses);
2008 manager.add_contact(contact).await.map_err(|e| {
2009 P2PError::Network(crate::error::NetworkError::ProtocolError(
2010 format!("Failed to add peer to bootstrap cache: {e}").into(),
2011 ))
2012 })?;
2013 }
2014 Ok(())
2015 }
2016
2017 pub async fn update_peer_metrics(
2019 &self,
2020 peer_id: &PeerId,
2021 success: bool,
2022 latency_ms: Option<u64>,
2023 _error: Option<String>,
2024 ) -> Result<()> {
2025 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2026 let mut manager = bootstrap_manager.write().await;
2027
2028 let metrics = QualityMetrics {
2030 success_rate: if success { 1.0 } else { 0.0 },
2031 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2032 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2034 last_successful_connection: if success {
2035 chrono::Utc::now()
2036 } else {
2037 chrono::Utc::now() - chrono::Duration::hours(1)
2038 },
2039 uptime_score: 0.5,
2040 };
2041
2042 manager
2043 .update_contact_metrics(peer_id, metrics)
2044 .await
2045 .map_err(|e| {
2046 P2PError::Network(crate::error::NetworkError::ProtocolError(
2047 format!("Failed to update peer metrics: {e}").into(),
2048 ))
2049 })?;
2050 }
2051 Ok(())
2052 }
2053
2054 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2056 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2057 let manager = bootstrap_manager.read().await;
2058 let stats = manager.get_stats().await.map_err(|e| {
2059 P2PError::Network(crate::error::NetworkError::ProtocolError(
2060 format!("Failed to get bootstrap stats: {e}").into(),
2061 ))
2062 })?;
2063 Ok(Some(stats))
2064 } else {
2065 Ok(None)
2066 }
2067 }
2068
2069 pub async fn cached_peer_count(&self) -> usize {
2071 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2072 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2073 {
2074 return stats.total_contacts;
2075 }
2076 0
2077 }
2078
2079 async fn connect_bootstrap_peers(&self) -> Result<()> {
2081 let mut bootstrap_contacts = Vec::new();
2082 let mut used_cache = false;
2083 let mut seen_addresses = std::collections::HashSet::new();
2084
2085 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2087 self.config.bootstrap_peers_str.clone()
2088 } else {
2089 self.config
2091 .bootstrap_peers
2092 .iter()
2093 .map(|addr| addr.to_string())
2094 .collect::<Vec<_>>()
2095 };
2096
2097 if !cli_bootstrap_peers.is_empty() {
2098 info!(
2099 "Using {} CLI-provided bootstrap peers (priority)",
2100 cli_bootstrap_peers.len()
2101 );
2102 for addr in &cli_bootstrap_peers {
2103 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2104 seen_addresses.insert(socket_addr);
2105 let contact = ContactEntry::new(
2106 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2107 vec![socket_addr],
2108 );
2109 bootstrap_contacts.push(contact);
2110 } else {
2111 warn!("Invalid bootstrap address format: {}", addr);
2112 }
2113 }
2114 }
2115
2116 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2118 let manager = bootstrap_manager.read().await;
2119 match manager.get_bootstrap_peers(20).await {
2120 Ok(contacts) => {
2122 if !contacts.is_empty() {
2123 let mut added_from_cache = 0;
2124 for contact in contacts {
2125 let new_addresses: Vec<_> = contact
2127 .addresses
2128 .iter()
2129 .filter(|addr| !seen_addresses.contains(addr))
2130 .copied()
2131 .collect();
2132
2133 if !new_addresses.is_empty() {
2134 for addr in &new_addresses {
2135 seen_addresses.insert(*addr);
2136 }
2137 let mut contact = contact.clone();
2138 contact.addresses = new_addresses;
2139 bootstrap_contacts.push(contact);
2140 added_from_cache += 1;
2141 }
2142 }
2143 if added_from_cache > 0 {
2144 info!(
2145 "Added {} cached bootstrap peers (supplementing CLI peers)",
2146 added_from_cache
2147 );
2148 used_cache = true;
2149 }
2150 }
2151 }
2152 Err(e) => {
2153 warn!("Failed to get cached bootstrap peers: {}", e);
2154 }
2155 }
2156 }
2157
2158 if bootstrap_contacts.is_empty() {
2159 info!("No bootstrap peers configured and no cached peers available");
2160 return Ok(());
2161 }
2162
2163 let mut successful_connections = 0;
2165 for contact in bootstrap_contacts {
2166 for addr in &contact.addresses {
2167 match self.connect_peer(&addr.to_string()).await {
2168 Ok(peer_id) => {
2169 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2170 successful_connections += 1;
2171
2172 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2174 let mut manager = bootstrap_manager.write().await;
2175 let mut updated_contact = contact.clone();
2176 updated_contact.peer_id = peer_id.clone();
2177 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2180 warn!("Failed to update bootstrap cache: {}", e);
2181 }
2182 }
2183 break; }
2185 Err(e) => {
2186 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2187
2188 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2190 let mut manager = bootstrap_manager.write().await;
2191 let mut updated_contact = contact.clone();
2192 updated_contact.update_connection_result(
2193 false,
2194 None,
2195 Some(e.to_string()),
2196 );
2197
2198 if let Err(e) = manager.add_contact(updated_contact).await {
2199 warn!("Failed to update bootstrap cache: {}", e);
2200 }
2201 }
2202 }
2203 }
2204 }
2205 }
2206
2207 if successful_connections == 0 {
2208 if !used_cache {
2209 warn!("Failed to connect to any bootstrap peers");
2210 }
2211 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2212 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2214 }));
2215 }
2216 info!(
2217 "Successfully connected to {} bootstrap peers",
2218 successful_connections
2219 );
2220
2221 Ok(())
2222 }
2223
2224 async fn disconnect_all_peers(&self) -> Result<()> {
2226 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2227
2228 for peer_id in peer_ids {
2229 self.disconnect_peer(&peer_id).await?;
2230 }
2231
2232 Ok(())
2233 }
2234
2235 async fn periodic_tasks(&self) -> Result<()> {
2237 Ok(())
2243 }
2244}
2245
2246#[async_trait::async_trait]
2248pub trait NetworkSender: Send + Sync {
2249 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2251
2252 fn local_peer_id(&self) -> &PeerId;
2254}
2255
2256#[derive(Clone)]
2258pub struct P2PNetworkSender {
2259 peer_id: PeerId,
2260 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2262}
2263
2264impl P2PNetworkSender {
2265 pub fn new(
2266 peer_id: PeerId,
2267 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2268 ) -> Self {
2269 Self { peer_id, send_tx }
2270 }
2271}
2272
2273#[async_trait::async_trait]
2275impl NetworkSender for P2PNetworkSender {
2276 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2278 self.send_tx
2279 .send((peer_id.clone(), protocol.to_string(), data))
2280 .map_err(|_| {
2281 P2PError::Network(crate::error::NetworkError::ProtocolError(
2282 "Failed to send message via channel".to_string().into(),
2283 ))
2284 })?;
2285 Ok(())
2286 }
2287
2288 fn local_peer_id(&self) -> &PeerId {
2290 &self.peer_id
2291 }
2292}
2293
2294pub struct NodeBuilder {
2296 config: NodeConfig,
2297}
2298
2299impl Default for NodeBuilder {
2300 fn default() -> Self {
2301 Self::new()
2302 }
2303}
2304
2305impl NodeBuilder {
2306 pub fn new() -> Self {
2308 Self {
2309 config: NodeConfig::default(),
2310 }
2311 }
2312
2313 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2315 self.config.peer_id = Some(peer_id);
2316 self
2317 }
2318
2319 pub fn listen_on(mut self, addr: &str) -> Self {
2321 if let Ok(multiaddr) = addr.parse() {
2322 self.config.listen_addrs.push(multiaddr);
2323 }
2324 self
2325 }
2326
2327 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2329 if let Ok(multiaddr) = addr.parse() {
2330 self.config.bootstrap_peers.push(multiaddr);
2331 }
2332 self.config.bootstrap_peers_str.push(addr.to_string());
2333 self
2334 }
2335
2336 pub fn with_ipv6(mut self, enable: bool) -> Self {
2338 self.config.enable_ipv6 = enable;
2339 self
2340 }
2341
2342 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2346 self.config.connection_timeout = timeout;
2347 self
2348 }
2349
2350 pub fn with_max_connections(mut self, max: usize) -> Self {
2352 self.config.max_connections = max;
2353 self
2354 }
2355
2356 pub fn with_production_mode(mut self) -> Self {
2358 self.config.production_config = Some(ProductionConfig::default());
2359 self
2360 }
2361
2362 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2364 self.config.production_config = Some(production_config);
2365 self
2366 }
2367
2368 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2370 self.config.dht_config = dht_config;
2371 self
2372 }
2373
2374 pub fn with_default_dht(mut self) -> Self {
2376 self.config.dht_config = DHTConfig::default();
2377 self
2378 }
2379
2380 pub async fn build(self) -> Result<P2PNode> {
2382 P2PNode::new(self.config).await
2383 }
2384}
2385
2386#[allow(dead_code)] async fn handle_received_message_standalone(
2389 message_data: Vec<u8>,
2390 peer_id: &PeerId,
2391 _protocol: &str,
2392 event_tx: &broadcast::Sender<P2PEvent>,
2393) -> Result<()> {
2394 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2396 Ok(message) => {
2397 if let (Some(protocol), Some(data), Some(from)) = (
2398 message.get("protocol").and_then(|v| v.as_str()),
2399 message.get("data").and_then(|v| v.as_array()),
2400 message.get("from").and_then(|v| v.as_str()),
2401 ) {
2402 let data_bytes: Vec<u8> = data
2404 .iter()
2405 .filter_map(|v| v.as_u64().map(|n| n as u8))
2406 .collect();
2407
2408 let event = P2PEvent::Message {
2410 topic: protocol.to_string(),
2411 source: from.to_string(),
2412 data: data_bytes,
2413 };
2414
2415 let _ = event_tx.send(event);
2416 debug!("Generated message event from peer: {}", peer_id);
2417 }
2418 }
2419 Err(e) => {
2420 warn!("Failed to parse received message from {}: {}", peer_id, e);
2421 }
2422 }
2423
2424 Ok(())
2425}
2426
2427#[allow(dead_code)]
2431fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2432 match create_protocol_message_static(protocol, data) {
2433 Ok(msg) => Some(msg),
2434 Err(e) => {
2435 warn!("Failed to create protocol message: {}", e);
2436 None
2437 }
2438 }
2439}
2440
2441#[allow(dead_code)]
2443async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2444 match result {
2445 Ok(_) => {
2446 debug!("Message sent to peer {} via transport layer", peer_id);
2447 }
2448 Err(e) => {
2449 warn!("Failed to send message to peer {}: {}", peer_id, e);
2450 }
2451 }
2452}
2453
2454#[allow(dead_code)] fn check_rate_limit(
2457 rate_limiter: &RateLimiter,
2458 socket_addr: &std::net::SocketAddr,
2459 remote_addr: &NetworkAddress,
2460) -> Result<()> {
2461 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2462 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2463 e
2464 })
2465}
2466
2467#[allow(dead_code)] async fn register_new_peer(
2470 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2471 peer_id: &PeerId,
2472 remote_addr: &NetworkAddress,
2473) {
2474 let mut peers_guard = peers.write().await;
2475 let peer_info = PeerInfo {
2476 peer_id: peer_id.clone(),
2477 addresses: vec![remote_addr.to_string()],
2478 connected_at: tokio::time::Instant::now(),
2479 last_seen: tokio::time::Instant::now(),
2480 status: ConnectionStatus::Connected,
2481 protocols: vec!["p2p-chat/1.0.0".to_string()],
2482 heartbeat_count: 0,
2483 };
2484 peers_guard.insert(peer_id.clone(), peer_info);
2485}
2486
2487#[allow(dead_code)] fn spawn_connection_handler(
2490 connection: Box<dyn crate::transport::Connection>,
2491 peer_id: PeerId,
2492 event_tx: broadcast::Sender<P2PEvent>,
2493 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2494) {
2495 tokio::spawn(async move {
2496 handle_peer_connection(connection, peer_id, event_tx, peers).await;
2497 });
2498}
2499
2500#[allow(dead_code)] async fn handle_peer_connection(
2503 mut connection: Box<dyn crate::transport::Connection>,
2504 peer_id: PeerId,
2505 event_tx: broadcast::Sender<P2PEvent>,
2506 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2507) {
2508 loop {
2509 match connection.receive().await {
2510 Ok(message_data) => {
2511 debug!(
2512 "Received {} bytes from peer: {}",
2513 message_data.len(),
2514 peer_id
2515 );
2516
2517 if let Err(e) = handle_received_message_standalone(
2519 message_data,
2520 &peer_id,
2521 "unknown", &event_tx,
2523 )
2524 .await
2525 {
2526 warn!("Failed to handle message from peer {}: {}", peer_id, e);
2527 }
2528 }
2529 Err(e) => {
2530 warn!("Failed to receive message from {}: {}", peer_id, e);
2531
2532 if !connection.is_alive().await {
2534 info!("Connection to {} is dead, removing peer", peer_id);
2535
2536 remove_peer(&peers, &peer_id).await;
2538
2539 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2541
2542 break; }
2544
2545 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2547 }
2548 }
2549 }
2550}
2551
2552#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2555 let mut peers_guard = peers.write().await;
2556 peers_guard.remove(peer_id);
2557}
2558
2559#[allow(dead_code)]
2561async fn update_peer_heartbeat(
2562 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2563 peer_id: &PeerId,
2564) -> Result<()> {
2565 let mut peers_guard = peers.write().await;
2566 match peers_guard.get_mut(peer_id) {
2567 Some(peer_info) => {
2568 peer_info.last_seen = Instant::now();
2569 peer_info.heartbeat_count += 1;
2570 Ok(())
2571 }
2572 None => {
2573 warn!("Received heartbeat from unknown peer: {}", peer_id);
2574 Err(P2PError::Network(NetworkError::PeerNotFound(
2575 format!("Peer {} not found", peer_id).into(),
2576 )))
2577 }
2578 }
2579}
2580
2581#[allow(dead_code)]
2583async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2584 if let Some(manager) = resource_manager {
2585 let metrics = manager.get_metrics().await;
2586 (metrics.memory_used, metrics.cpu_usage)
2587 } else {
2588 (0, 0.0)
2589 }
2590}
2591
2592#[cfg(test)]
2593mod tests {
2594 use super::*;
2595 use std::time::Duration;
2597 use tokio::time::timeout;
2598
2599 fn create_test_node_config() -> NodeConfig {
2605 NodeConfig {
2606 peer_id: Some("test_peer_123".to_string()),
2607 listen_addrs: vec![
2608 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2609 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2610 ],
2611 listen_addr: std::net::SocketAddr::new(
2612 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2613 0,
2614 ),
2615 bootstrap_peers: vec![],
2616 bootstrap_peers_str: vec![],
2617 enable_ipv6: true,
2618
2619 connection_timeout: Duration::from_millis(300),
2620 keep_alive_interval: Duration::from_secs(30),
2621 max_connections: 100,
2622 max_incoming_connections: 50,
2623 dht_config: DHTConfig::default(),
2624 security_config: SecurityConfig::default(),
2625 production_config: None,
2626 bootstrap_cache_config: None,
2627 }
2629 }
2630
2631 #[tokio::test]
2635 async fn test_node_config_default() {
2636 let config = NodeConfig::default();
2637
2638 assert!(config.peer_id.is_none());
2639 assert_eq!(config.listen_addrs.len(), 2);
2640 assert!(config.enable_ipv6);
2641 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2643 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2644 }
2645
2646 #[tokio::test]
2647 async fn test_dht_config_default() {
2648 let config = DHTConfig::default();
2649
2650 assert_eq!(config.k_value, 20);
2651 assert_eq!(config.alpha_value, 5);
2652 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2653 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2654 }
2655
2656 #[tokio::test]
2657 async fn test_security_config_default() {
2658 let config = SecurityConfig::default();
2659
2660 assert!(config.enable_noise);
2661 assert!(config.enable_tls);
2662 assert_eq!(config.trust_level, TrustLevel::Basic);
2663 }
2664
2665 #[test]
2666 fn test_trust_level_variants() {
2667 let _none = TrustLevel::None;
2669 let _basic = TrustLevel::Basic;
2670 let _full = TrustLevel::Full;
2671
2672 assert_eq!(TrustLevel::None, TrustLevel::None);
2674 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2675 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2676 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2677 }
2678
2679 #[test]
2680 fn test_connection_status_variants() {
2681 let connecting = ConnectionStatus::Connecting;
2682 let connected = ConnectionStatus::Connected;
2683 let disconnecting = ConnectionStatus::Disconnecting;
2684 let disconnected = ConnectionStatus::Disconnected;
2685 let failed = ConnectionStatus::Failed("test error".to_string());
2686
2687 assert_eq!(connecting, ConnectionStatus::Connecting);
2688 assert_eq!(connected, ConnectionStatus::Connected);
2689 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2690 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2691 assert_ne!(connecting, connected);
2692
2693 if let ConnectionStatus::Failed(msg) = failed {
2694 assert_eq!(msg, "test error");
2695 } else {
2696 panic!("Expected Failed status");
2697 }
2698 }
2699
2700 #[tokio::test]
2701 async fn test_node_creation() -> Result<()> {
2702 let config = create_test_node_config();
2703 let node = P2PNode::new(config).await?;
2704
2705 assert_eq!(node.peer_id(), "test_peer_123");
2706 assert!(!node.is_running().await);
2707 assert_eq!(node.peer_count().await, 0);
2708 assert!(node.connected_peers().await.is_empty());
2709
2710 Ok(())
2711 }
2712
2713 #[tokio::test]
2714 async fn test_node_creation_without_peer_id() -> Result<()> {
2715 let mut config = create_test_node_config();
2716 config.peer_id = None;
2717
2718 let node = P2PNode::new(config).await?;
2719
2720 assert!(node.peer_id().starts_with("peer_"));
2722 assert!(!node.is_running().await);
2723
2724 Ok(())
2725 }
2726
2727 #[tokio::test]
2728 async fn test_node_lifecycle() -> Result<()> {
2729 let config = create_test_node_config();
2730 let node = P2PNode::new(config).await?;
2731
2732 assert!(!node.is_running().await);
2734
2735 node.start().await?;
2737 assert!(node.is_running().await);
2738
2739 let listen_addrs = node.listen_addrs().await;
2741 assert!(
2742 !listen_addrs.is_empty(),
2743 "Expected at least one listening address"
2744 );
2745
2746 node.stop().await?;
2748 assert!(!node.is_running().await);
2749
2750 Ok(())
2751 }
2752
2753 #[tokio::test]
2754 async fn test_peer_connection() -> Result<()> {
2755 let config = create_test_node_config();
2756 let node = P2PNode::new(config).await?;
2757
2758 let peer_addr = "127.0.0.1:0";
2759
2760 let peer_id = node.connect_peer(peer_addr).await?;
2762 assert!(peer_id.starts_with("peer_from_"));
2763
2764 assert_eq!(node.peer_count().await, 1);
2766
2767 let connected_peers = node.connected_peers().await;
2769 assert_eq!(connected_peers.len(), 1);
2770 assert_eq!(connected_peers[0], peer_id);
2771
2772 let peer_info = node.peer_info(&peer_id).await;
2774 assert!(peer_info.is_some());
2775 let info = peer_info.expect("Peer info should exist after adding peer");
2776 assert_eq!(info.peer_id, peer_id);
2777 assert_eq!(info.status, ConnectionStatus::Connected);
2778 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2779
2780 node.disconnect_peer(&peer_id).await?;
2782 assert_eq!(node.peer_count().await, 0);
2783
2784 Ok(())
2785 }
2786
2787 #[tokio::test]
2788 async fn test_event_subscription() -> Result<()> {
2789 let config = create_test_node_config();
2790 let node = P2PNode::new(config).await?;
2791
2792 let mut events = node.subscribe_events();
2793 let peer_addr = "127.0.0.1:0";
2794
2795 let peer_id = node.connect_peer(peer_addr).await?;
2797
2798 let event = timeout(Duration::from_millis(100), events.recv()).await;
2800 assert!(event.is_ok());
2801
2802 let event_result = event
2803 .expect("Should receive event")
2804 .expect("Event should not be error");
2805 match event_result {
2806 P2PEvent::PeerConnected(event_peer_id) => {
2807 assert_eq!(event_peer_id, peer_id);
2808 }
2809 _ => panic!("Expected PeerConnected event"),
2810 }
2811
2812 node.disconnect_peer(&peer_id).await?;
2814
2815 let event = timeout(Duration::from_millis(100), events.recv()).await;
2817 assert!(event.is_ok());
2818
2819 let event_result = event
2820 .expect("Should receive event")
2821 .expect("Event should not be error");
2822 match event_result {
2823 P2PEvent::PeerDisconnected(event_peer_id) => {
2824 assert_eq!(event_peer_id, peer_id);
2825 }
2826 _ => panic!("Expected PeerDisconnected event"),
2827 }
2828
2829 Ok(())
2830 }
2831
2832 #[tokio::test]
2833 async fn test_message_sending() -> Result<()> {
2834 let mut config1 = create_test_node_config();
2836 config1.listen_addr =
2837 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2838 let node1 = P2PNode::new(config1).await?;
2839 node1.start().await?;
2840
2841 let mut config2 = create_test_node_config();
2842 config2.listen_addr =
2843 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2844 let node2 = P2PNode::new(config2).await?;
2845 node2.start().await?;
2846
2847 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2849
2850 let node2_addr = node2.local_addr().ok_or_else(|| {
2852 P2PError::Network(crate::error::NetworkError::ProtocolError(
2853 "No listening address".to_string().into(),
2854 ))
2855 })?;
2856
2857 let peer_id =
2859 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2860 Ok(res) => res?,
2861 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2862 };
2863
2864 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2866
2867 let message_data = b"Hello, peer!".to_vec();
2869 let result = match timeout(
2870 Duration::from_millis(500),
2871 node1.send_message(&peer_id, "test-protocol", message_data),
2872 )
2873 .await
2874 {
2875 Ok(res) => res,
2876 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2877 };
2878 if let Err(e) = &result {
2881 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2882 }
2883
2884 let non_existent_peer = "non_existent_peer".to_string();
2886 let result = node1
2887 .send_message(&non_existent_peer, "test-protocol", vec![])
2888 .await;
2889 assert!(result.is_err(), "Sending to non-existent peer should fail");
2890
2891 Ok(())
2892 }
2893
2894 #[tokio::test]
2895 async fn test_remote_mcp_operations() -> Result<()> {
2896 let config = create_test_node_config();
2897 let node = P2PNode::new(config).await?;
2898
2899 node.start().await?;
2901 node.stop().await?;
2902 Ok(())
2903 }
2904
2905 #[tokio::test]
2906 async fn test_health_check() -> Result<()> {
2907 let config = create_test_node_config();
2908 let node = P2PNode::new(config).await?;
2909
2910 let result = node.health_check().await;
2912 assert!(result.is_ok());
2913
2914 Ok(())
2919 }
2920
2921 #[tokio::test]
2922 async fn test_node_uptime() -> Result<()> {
2923 let config = create_test_node_config();
2924 let node = P2PNode::new(config).await?;
2925
2926 let uptime1 = node.uptime();
2927 assert!(uptime1 >= Duration::from_secs(0));
2928
2929 tokio::time::sleep(Duration::from_millis(10)).await;
2931
2932 let uptime2 = node.uptime();
2933 assert!(uptime2 > uptime1);
2934
2935 Ok(())
2936 }
2937
2938 #[tokio::test]
2939 async fn test_node_config_access() -> Result<()> {
2940 let config = create_test_node_config();
2941 let expected_peer_id = config.peer_id.clone();
2942 let node = P2PNode::new(config).await?;
2943
2944 let node_config = node.config();
2945 assert_eq!(node_config.peer_id, expected_peer_id);
2946 assert_eq!(node_config.max_connections, 100);
2947 Ok(())
2950 }
2951
2952 #[tokio::test]
2953 async fn test_mcp_server_access() -> Result<()> {
2954 let config = create_test_node_config();
2955 let _node = P2PNode::new(config).await?;
2956
2957 Ok(())
2959 }
2960
2961 #[tokio::test]
2962 async fn test_dht_access() -> Result<()> {
2963 let config = create_test_node_config();
2964 let node = P2PNode::new(config).await?;
2965
2966 assert!(node.dht().is_some());
2968
2969 Ok(())
2970 }
2971
2972 #[tokio::test]
2973 async fn test_node_builder() -> Result<()> {
2974 let builder = P2PNode::builder()
2976 .with_peer_id("builder_test_peer".to_string())
2977 .listen_on("/ip4/127.0.0.1/tcp/0")
2978 .listen_on("/ip6/::1/tcp/0")
2979 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
2981 .with_connection_timeout(Duration::from_secs(15))
2982 .with_max_connections(200);
2983
2984 let config = builder.config;
2986 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2987 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
2990 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2991 assert_eq!(config.max_connections, 200);
2992
2993 Ok(())
2994 }
2995
2996 #[tokio::test]
2997 async fn test_bootstrap_peers() -> Result<()> {
2998 let mut config = create_test_node_config();
2999 config.bootstrap_peers = vec![
3000 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3001 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3002 ];
3003
3004 let node = P2PNode::new(config).await?;
3005
3006 node.start().await?;
3008
3009 let _peer_count = node.peer_count().await;
3013
3014 node.stop().await?;
3015 Ok(())
3016 }
3017
3018 #[tokio::test]
3019 async fn test_production_mode_disabled() -> Result<()> {
3020 let config = create_test_node_config();
3021 let node = P2PNode::new(config).await?;
3022
3023 assert!(!node.is_production_mode());
3024 assert!(node.production_config().is_none());
3025
3026 let result = node.resource_metrics().await;
3028 assert!(result.is_err());
3029 assert!(result.unwrap_err().to_string().contains("not enabled"));
3030
3031 Ok(())
3032 }
3033
3034 #[tokio::test]
3035 async fn test_network_event_variants() {
3036 let peer_id = "test_peer".to_string();
3038 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3039
3040 let _peer_connected = NetworkEvent::PeerConnected {
3041 peer_id: peer_id.clone(),
3042 addresses: vec![address.clone()],
3043 };
3044
3045 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3046 peer_id: peer_id.clone(),
3047 reason: "test disconnect".to_string(),
3048 };
3049
3050 let _message_received = NetworkEvent::MessageReceived {
3051 peer_id: peer_id.clone(),
3052 protocol: "test-protocol".to_string(),
3053 data: vec![1, 2, 3],
3054 };
3055
3056 let _connection_failed = NetworkEvent::ConnectionFailed {
3057 peer_id: Some(peer_id.clone()),
3058 address: address.clone(),
3059 error: "connection refused".to_string(),
3060 };
3061
3062 let _dht_stored = NetworkEvent::DHTRecordStored {
3063 key: vec![1, 2, 3],
3064 value: vec![4, 5, 6],
3065 };
3066
3067 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3068 key: vec![1, 2, 3],
3069 value: Some(vec![4, 5, 6]),
3070 };
3071 }
3072
3073 #[tokio::test]
3074 async fn test_peer_info_structure() {
3075 let peer_info = PeerInfo {
3076 peer_id: "test_peer".to_string(),
3077 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3078 connected_at: Instant::now(),
3079 last_seen: Instant::now(),
3080 status: ConnectionStatus::Connected,
3081 protocols: vec!["test-protocol".to_string()],
3082 heartbeat_count: 0,
3083 };
3084
3085 assert_eq!(peer_info.peer_id, "test_peer");
3086 assert_eq!(peer_info.addresses.len(), 1);
3087 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3088 assert_eq!(peer_info.protocols.len(), 1);
3089 }
3090
3091 #[tokio::test]
3092 async fn test_serialization() -> Result<()> {
3093 let config = create_test_node_config();
3095 let serialized = serde_json::to_string(&config)?;
3096 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3097
3098 assert_eq!(config.peer_id, deserialized.peer_id);
3099 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3100 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3101
3102 Ok(())
3103 }
3104
3105 #[tokio::test]
3106 async fn test_get_peer_id_by_address_found() -> Result<()> {
3107 let config = create_test_node_config();
3108 let node = P2PNode::new(config).await?;
3109
3110 let test_peer_id = "peer_test_123".to_string();
3112 let test_address = "192.168.1.100:9000".to_string();
3113
3114 let peer_info = PeerInfo {
3115 peer_id: test_peer_id.clone(),
3116 addresses: vec![test_address.clone()],
3117 connected_at: Instant::now(),
3118 last_seen: Instant::now(),
3119 status: ConnectionStatus::Connected,
3120 protocols: vec!["test-protocol".to_string()],
3121 heartbeat_count: 0,
3122 };
3123
3124 node.peers
3125 .write()
3126 .await
3127 .insert(test_peer_id.clone(), peer_info);
3128
3129 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3131 assert_eq!(found_peer_id, Some(test_peer_id));
3132
3133 Ok(())
3134 }
3135
3136 #[tokio::test]
3137 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3138 let config = create_test_node_config();
3139 let node = P2PNode::new(config).await?;
3140
3141 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3143 assert_eq!(result, None);
3144
3145 Ok(())
3146 }
3147
3148 #[tokio::test]
3149 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3150 let config = create_test_node_config();
3151 let node = P2PNode::new(config).await?;
3152
3153 let result = node.get_peer_id_by_address("invalid-address").await;
3155 assert_eq!(result, None);
3156
3157 Ok(())
3158 }
3159
3160 #[tokio::test]
3161 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3162 let config = create_test_node_config();
3163 let node = P2PNode::new(config).await?;
3164
3165 let peer1_id = "peer_1".to_string();
3167 let peer1_addr = "192.168.1.101:9001".to_string();
3168
3169 let peer2_id = "peer_2".to_string();
3170 let peer2_addr = "192.168.1.102:9002".to_string();
3171
3172 let peer1_info = PeerInfo {
3173 peer_id: peer1_id.clone(),
3174 addresses: vec![peer1_addr.clone()],
3175 connected_at: Instant::now(),
3176 last_seen: Instant::now(),
3177 status: ConnectionStatus::Connected,
3178 protocols: vec!["test-protocol".to_string()],
3179 heartbeat_count: 0,
3180 };
3181
3182 let peer2_info = PeerInfo {
3183 peer_id: peer2_id.clone(),
3184 addresses: vec![peer2_addr.clone()],
3185 connected_at: Instant::now(),
3186 last_seen: Instant::now(),
3187 status: ConnectionStatus::Connected,
3188 protocols: vec!["test-protocol".to_string()],
3189 heartbeat_count: 0,
3190 };
3191
3192 node.peers
3193 .write()
3194 .await
3195 .insert(peer1_id.clone(), peer1_info);
3196 node.peers
3197 .write()
3198 .await
3199 .insert(peer2_id.clone(), peer2_info);
3200
3201 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3203 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3204
3205 assert_eq!(found_peer1, Some(peer1_id));
3206 assert_eq!(found_peer2, Some(peer2_id));
3207
3208 Ok(())
3209 }
3210
3211 #[tokio::test]
3212 async fn test_list_active_connections_empty() -> Result<()> {
3213 let config = create_test_node_config();
3214 let node = P2PNode::new(config).await?;
3215
3216 let connections = node.list_active_connections().await;
3218 assert!(connections.is_empty());
3219
3220 Ok(())
3221 }
3222
3223 #[tokio::test]
3224 async fn test_list_active_connections_with_peers() -> Result<()> {
3225 let config = create_test_node_config();
3226 let node = P2PNode::new(config).await?;
3227
3228 let peer1_id = "peer_1".to_string();
3230 let peer1_addrs = vec![
3231 "192.168.1.101:9001".to_string(),
3232 "192.168.1.101:9002".to_string(),
3233 ];
3234
3235 let peer2_id = "peer_2".to_string();
3236 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3237
3238 let peer1_info = PeerInfo {
3239 peer_id: peer1_id.clone(),
3240 addresses: peer1_addrs.clone(),
3241 connected_at: Instant::now(),
3242 last_seen: Instant::now(),
3243 status: ConnectionStatus::Connected,
3244 protocols: vec!["test-protocol".to_string()],
3245 heartbeat_count: 0,
3246 };
3247
3248 let peer2_info = PeerInfo {
3249 peer_id: peer2_id.clone(),
3250 addresses: peer2_addrs.clone(),
3251 connected_at: Instant::now(),
3252 last_seen: Instant::now(),
3253 status: ConnectionStatus::Connected,
3254 protocols: vec!["test-protocol".to_string()],
3255 heartbeat_count: 0,
3256 };
3257
3258 node.peers
3259 .write()
3260 .await
3261 .insert(peer1_id.clone(), peer1_info);
3262 node.peers
3263 .write()
3264 .await
3265 .insert(peer2_id.clone(), peer2_info);
3266
3267 let connections = node.list_active_connections().await;
3269 assert_eq!(connections.len(), 2);
3270
3271 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3273 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3274
3275 assert!(peer1_conn.is_some());
3276 assert!(peer2_conn.is_some());
3277
3278 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3280 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3281
3282 Ok(())
3283 }
3284
3285 #[tokio::test]
3286 async fn test_remove_peer_success() -> Result<()> {
3287 let config = create_test_node_config();
3288 let node = P2PNode::new(config).await?;
3289
3290 let peer_id = "peer_to_remove".to_string();
3292 let peer_info = PeerInfo {
3293 peer_id: peer_id.clone(),
3294 addresses: vec!["192.168.1.100:9000".to_string()],
3295 connected_at: Instant::now(),
3296 last_seen: Instant::now(),
3297 status: ConnectionStatus::Connected,
3298 protocols: vec!["test-protocol".to_string()],
3299 heartbeat_count: 0,
3300 };
3301
3302 node.peers.write().await.insert(peer_id.clone(), peer_info);
3303
3304 assert!(node.is_peer_connected(&peer_id).await);
3306
3307 let removed = node.remove_peer(&peer_id).await;
3309 assert!(removed);
3310
3311 assert!(!node.is_peer_connected(&peer_id).await);
3313
3314 Ok(())
3315 }
3316
3317 #[tokio::test]
3318 async fn test_remove_peer_nonexistent() -> Result<()> {
3319 let config = create_test_node_config();
3320 let node = P2PNode::new(config).await?;
3321
3322 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3324 assert!(!removed);
3325
3326 Ok(())
3327 }
3328
3329 #[tokio::test]
3330 async fn test_is_peer_connected() -> Result<()> {
3331 let config = create_test_node_config();
3332 let node = P2PNode::new(config).await?;
3333
3334 let peer_id = "test_peer".to_string();
3335
3336 assert!(!node.is_peer_connected(&peer_id).await);
3338
3339 let peer_info = PeerInfo {
3341 peer_id: peer_id.clone(),
3342 addresses: vec!["192.168.1.100:9000".to_string()],
3343 connected_at: Instant::now(),
3344 last_seen: Instant::now(),
3345 status: ConnectionStatus::Connected,
3346 protocols: vec!["test-protocol".to_string()],
3347 heartbeat_count: 0,
3348 };
3349
3350 node.peers.write().await.insert(peer_id.clone(), peer_info);
3351
3352 assert!(node.is_peer_connected(&peer_id).await);
3354
3355 node.remove_peer(&peer_id).await;
3357
3358 assert!(!node.is_peer_connected(&peer_id).await);
3360
3361 Ok(())
3362 }
3363
3364 #[test]
3365 fn test_normalize_ipv6_wildcard() {
3366 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3367
3368 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3369 let normalized = normalize_wildcard_to_loopback(wildcard);
3370
3371 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3372 assert_eq!(normalized.port(), 8080);
3373 }
3374
3375 #[test]
3376 fn test_normalize_ipv4_wildcard() {
3377 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3378
3379 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3380 let normalized = normalize_wildcard_to_loopback(wildcard);
3381
3382 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3383 assert_eq!(normalized.port(), 9000);
3384 }
3385
3386 #[test]
3387 fn test_normalize_specific_address_unchanged() {
3388 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3389 let normalized = normalize_wildcard_to_loopback(specific);
3390
3391 assert_eq!(normalized, specific);
3392 }
3393
3394 #[test]
3395 fn test_normalize_loopback_unchanged() {
3396 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3397
3398 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3399 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3400 assert_eq!(normalized_v6, loopback_v6);
3401
3402 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3403 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3404 assert_eq!(normalized_v4, loopback_v4);
3405 }
3406}