1use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::json;
37use std::collections::{HashMap, HashSet};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::time::Duration;
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::Instant;
43use tracing::{debug, info, trace, warn};
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct NodeConfig {
48 pub peer_id: Option<PeerId>,
50
51 pub listen_addrs: Vec<std::net::SocketAddr>,
53
54 pub listen_addr: std::net::SocketAddr,
56
57 pub bootstrap_peers: Vec<std::net::SocketAddr>,
59
60 pub bootstrap_peers_str: Vec<String>,
62
63 pub enable_ipv6: bool,
65
66 pub connection_timeout: Duration,
69
70 pub keep_alive_interval: Duration,
72
73 pub max_connections: usize,
75
76 pub max_incoming_connections: usize,
78
79 pub dht_config: DHTConfig,
81
82 pub security_config: SecurityConfig,
84
85 pub production_config: Option<ProductionConfig>,
87
88 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct DHTConfig {
95 pub k_value: usize,
97
98 pub alpha_value: usize,
100
101 pub record_ttl: Duration,
103
104 pub refresh_interval: Duration,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SecurityConfig {
111 pub enable_noise: bool,
113
114 pub enable_tls: bool,
116
117 pub trust_level: TrustLevel,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
123pub enum TrustLevel {
124 None,
126 Basic,
128 Full,
130}
131
132impl NodeConfig {
133 pub fn new() -> Result<Self> {
139 let config = Config::default();
141
142 let listen_addr = config.listen_socket_addr()?;
144
145 let mut listen_addrs = vec![];
147
148 if config.network.ipv6_enabled {
150 let ipv6_addr = std::net::SocketAddr::new(
151 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
152 listen_addr.port(),
153 );
154 listen_addrs.push(ipv6_addr);
155 }
156
157 let ipv4_addr = std::net::SocketAddr::new(
159 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
160 listen_addr.port(),
161 );
162 listen_addrs.push(ipv4_addr);
163
164 Ok(Self {
165 peer_id: None,
166 listen_addrs,
167 listen_addr,
168 bootstrap_peers: Vec::new(),
169 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
170 enable_ipv6: config.network.ipv6_enabled,
171
172 connection_timeout: Duration::from_secs(config.network.connection_timeout),
173 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
174 max_connections: config.network.max_connections,
175 max_incoming_connections: config.security.connection_limit as usize,
176 dht_config: DHTConfig::default(),
177 security_config: SecurityConfig::default(),
178 production_config: None,
179 bootstrap_cache_config: None,
180 })
182 }
183}
184
185impl Default for NodeConfig {
186 fn default() -> Self {
187 let config = Config::default();
189
190 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
192 std::net::SocketAddr::new(
193 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
194 9000,
195 )
196 });
197
198 Self {
199 peer_id: None,
200 listen_addrs: vec![
201 std::net::SocketAddr::new(
202 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
203 listen_addr.port(),
204 ),
205 std::net::SocketAddr::new(
206 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
207 listen_addr.port(),
208 ),
209 ],
210 listen_addr,
211 bootstrap_peers: Vec::new(),
212 bootstrap_peers_str: Vec::new(),
213 enable_ipv6: config.network.ipv6_enabled,
214
215 connection_timeout: Duration::from_secs(config.network.connection_timeout),
216 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
217 max_connections: config.network.max_connections,
218 max_incoming_connections: config.security.connection_limit as usize,
219 dht_config: DHTConfig::default(),
220 security_config: SecurityConfig::default(),
221 production_config: None, bootstrap_cache_config: None,
223 }
225 }
226}
227
228impl NodeConfig {
229 pub fn from_config(config: &Config) -> Result<Self> {
231 let listen_addr = config.listen_socket_addr()?;
232 let bootstrap_addrs = config.bootstrap_addrs()?;
233
234 let mut node_config = Self {
235 peer_id: None,
236 listen_addrs: vec![listen_addr],
237 listen_addr,
238 bootstrap_peers: bootstrap_addrs
239 .iter()
240 .map(|addr| addr.socket_addr())
241 .collect(),
242 bootstrap_peers_str: config
243 .network
244 .bootstrap_nodes
245 .iter()
246 .map(|addr| addr.to_string())
247 .collect(),
248 enable_ipv6: config.network.ipv6_enabled,
249
250 connection_timeout: Duration::from_secs(config.network.connection_timeout),
251 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
252 max_connections: config.network.max_connections,
253 max_incoming_connections: config.security.connection_limit as usize,
254 dht_config: DHTConfig {
255 k_value: 20,
256 alpha_value: 3,
257 record_ttl: Duration::from_secs(3600),
258 refresh_interval: Duration::from_secs(900),
259 },
260 security_config: SecurityConfig {
261 enable_noise: true,
262 enable_tls: true,
263 trust_level: TrustLevel::Basic,
264 },
265 production_config: Some(ProductionConfig {
266 max_connections: config.network.max_connections,
267 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
270 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
271 health_check_interval: Duration::from_secs(30),
272 metrics_interval: Duration::from_secs(60),
273 enable_performance_tracking: true,
274 enable_auto_cleanup: true,
275 shutdown_timeout: Duration::from_secs(30),
276 rate_limits: crate::production::RateLimitConfig::default(),
277 }),
278 bootstrap_cache_config: None,
279 };
284
285 if config.network.ipv6_enabled {
287 node_config.listen_addrs.push(std::net::SocketAddr::new(
288 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
289 listen_addr.port(),
290 ));
291 }
292
293 Ok(node_config)
294 }
295
296 pub fn with_listen_addr(addr: &str) -> Result<Self> {
298 let listen_addr: std::net::SocketAddr = addr
299 .parse()
300 .map_err(|e: std::net::AddrParseError| {
301 NetworkError::InvalidAddress(e.to_string().into())
302 })
303 .map_err(P2PError::Network)?;
304 let cfg = NodeConfig {
305 listen_addr,
306 listen_addrs: vec![listen_addr],
307 ..Default::default()
308 };
309 Ok(cfg)
310 }
311}
312
313impl Default for DHTConfig {
314 fn default() -> Self {
315 Self {
316 k_value: 20,
317 alpha_value: 5,
318 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
321 }
322}
323
324impl Default for SecurityConfig {
325 fn default() -> Self {
326 Self {
327 enable_noise: true,
328 enable_tls: true,
329 trust_level: TrustLevel::Basic,
330 }
331 }
332}
333
334#[derive(Debug, Clone)]
336pub struct PeerInfo {
337 pub peer_id: PeerId,
339
340 pub addresses: Vec<String>,
342
343 pub connected_at: Instant,
345
346 pub last_seen: Instant,
348
349 pub status: ConnectionStatus,
351
352 pub protocols: Vec<String>,
354
355 pub heartbeat_count: u64,
357}
358
359#[derive(Debug, Clone, PartialEq)]
361pub enum ConnectionStatus {
362 Connecting,
364 Connected,
366 Disconnecting,
368 Disconnected,
370 Failed(String),
372}
373
374#[derive(Debug, Clone)]
376pub enum NetworkEvent {
377 PeerConnected {
379 peer_id: PeerId,
381 addresses: Vec<String>,
383 },
384
385 PeerDisconnected {
387 peer_id: PeerId,
389 reason: String,
391 },
392
393 MessageReceived {
395 peer_id: PeerId,
397 protocol: String,
399 data: Vec<u8>,
401 },
402
403 ConnectionFailed {
405 peer_id: Option<PeerId>,
407 address: String,
409 error: String,
411 },
412
413 DHTRecordStored {
415 key: Vec<u8>,
417 value: Vec<u8>,
419 },
420
421 DHTRecordRetrieved {
423 key: Vec<u8>,
425 value: Option<Vec<u8>>,
427 },
428}
429
430#[derive(Debug, Clone)]
435pub enum P2PEvent {
436 Message {
438 topic: String,
440 source: PeerId,
442 data: Vec<u8>,
444 },
445 PeerConnected(PeerId),
447 PeerDisconnected(PeerId),
449}
450
451pub struct P2PNode {
461 config: NodeConfig,
463
464 peer_id: PeerId,
466
467 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
469
470 event_tx: broadcast::Sender<P2PEvent>,
472
473 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
475
476 start_time: Instant,
478
479 running: RwLock<bool>,
481
482 dht: Option<Arc<RwLock<DHT>>>,
484
485 resource_manager: Option<Arc<ResourceManager>>,
487
488 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
490
491 dual_node: Arc<DualStackNetworkNode>,
493
494 #[allow(dead_code)]
496 rate_limiter: Arc<RateLimiter>,
497
498 active_connections: Arc<RwLock<HashSet<PeerId>>>,
501
502 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
504
505 #[allow(dead_code)]
507 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
508
509 #[allow(dead_code)]
511 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
512
513 #[allow(dead_code)]
515 shutdown: Arc<AtomicBool>,
516
517 #[allow(dead_code)]
519 geo_provider: Arc<BgpGeoProvider>,
520}
521
522fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
538 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
539
540 if addr.ip().is_unspecified() {
541 let loopback_ip = match addr {
543 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
546 std::net::SocketAddr::new(loopback_ip, addr.port())
547 } else {
548 addr
550 }
551}
552
553impl P2PNode {
554 pub fn new_for_tests() -> Result<Self> {
556 let (event_tx, _) = broadcast::channel(16);
557 Ok(Self {
558 config: NodeConfig::default(),
559 peer_id: "test_peer".to_string(),
560 peers: Arc::new(RwLock::new(HashMap::new())),
561 event_tx,
562 listen_addrs: RwLock::new(Vec::new()),
563 start_time: Instant::now(),
564 running: RwLock::new(false),
565 dht: None,
566 resource_manager: None,
567 bootstrap_manager: None,
568 dual_node: {
569 let v6: Option<std::net::SocketAddr> = "[::1]:0"
571 .parse()
572 .ok()
573 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
574 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
575 let handle = tokio::runtime::Handle::current();
576 let dual_attempt = handle.block_on(
577 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
578 );
579 let dual = match dual_attempt {
580 Ok(d) => d,
581 Err(_e1) => {
582 let fallback = handle.block_on(
584 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
585 None,
586 "127.0.0.1:0".parse().ok(),
587 ),
588 );
589 match fallback {
590 Ok(d) => d,
591 Err(e2) => {
592 return Err(P2PError::Network(NetworkError::BindError(
593 format!("Failed to create dual-stack network node: {}", e2)
594 .into(),
595 )));
596 }
597 }
598 }
599 };
600 Arc::new(dual)
601 },
602 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
603 max_requests: 100,
604 burst_size: 100,
605 window: std::time::Duration::from_secs(1),
606 ..Default::default()
607 })),
608 active_connections: Arc::new(RwLock::new(HashSet::new())),
609 connection_monitor_handle: Arc::new(RwLock::new(None)),
610 keepalive_handle: Arc::new(RwLock::new(None)),
611 shutdown: Arc::new(AtomicBool::new(false)),
612 geo_provider: Arc::new(BgpGeoProvider::new()),
613 security_dashboard: None,
614 })
615 }
616 pub async fn new(config: NodeConfig) -> Result<Self> {
618 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
619 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
621 });
622
623 let (event_tx, _) = broadcast::channel(1000);
624
625 {
628 use blake3::Hasher;
629 let mut hasher = Hasher::new();
630 hasher.update(peer_id.as_bytes());
631 let digest = hasher.finalize();
632 let mut nid = [0u8; 32];
633 nid.copy_from_slice(digest.as_bytes());
634 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
635 crate::identity::node_identity::NodeId::from_bytes(nid),
636 ));
637 }
640
641 let (dht, security_dashboard) = if true {
643 let _dht_config = crate::dht::DHTConfig {
645 replication_factor: config.dht_config.k_value,
646 bucket_size: config.dht_config.k_value,
647 alpha: config.dht_config.alpha_value,
648 record_ttl: config.dht_config.record_ttl,
649 bucket_refresh_interval: config.dht_config.refresh_interval,
650 republish_interval: config.dht_config.refresh_interval,
651 max_distance: 160,
652 };
653 let peer_bytes = peer_id.as_bytes();
655 let mut node_id_bytes = [0u8; 32];
656 let len = peer_bytes.len().min(32);
657 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
658 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
659 let dht_instance = DHT::new(node_id).map_err(|e| {
660 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
661 e.to_string().into(),
662 ))
663 })?;
664 dht_instance.start_maintenance_tasks();
665
666 let security_metrics = dht_instance.security_metrics();
668 let dashboard = crate::dht::metrics::SecurityDashboard::new(
669 security_metrics,
670 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
671 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
672 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
673 );
674
675 (
676 Some(Arc::new(RwLock::new(dht_instance))),
677 Some(Arc::new(dashboard)),
678 )
679 } else {
680 (None, None)
681 };
682
683 let resource_manager = config
687 .production_config
688 .clone()
689 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
690
691 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
693 match BootstrapManager::with_config(cache_config.clone()).await {
694 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
695 Err(e) => {
696 warn!(
697 "Failed to initialize bootstrap manager: {}, continuing without cache",
698 e
699 );
700 None
701 }
702 }
703 } else {
704 match BootstrapManager::new().await {
705 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
706 Err(e) => {
707 warn!(
708 "Failed to initialize bootstrap manager: {}, continuing without cache",
709 e
710 );
711 None
712 }
713 }
714 };
715
716 let (v6_opt, v4_opt) = {
719 let port = config.listen_addr.port();
720 let ip = config.listen_addr.ip();
721
722 let v4_addr = if ip.is_ipv4() {
723 Some(std::net::SocketAddr::new(ip, port))
724 } else {
725 Some(std::net::SocketAddr::new(
728 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
729 port,
730 ))
731 };
732
733 let v6_addr = if config.enable_ipv6 {
734 if ip.is_ipv6() {
735 Some(std::net::SocketAddr::new(ip, port))
736 } else {
737 Some(std::net::SocketAddr::new(
738 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
739 port,
740 ))
741 }
742 } else {
743 None
744 };
745 (v6_addr, v4_addr)
746 };
747
748 let dual_node = Arc::new(
749 DualStackNetworkNode::new(v6_opt, v4_opt)
750 .await
751 .map_err(|e| {
752 P2PError::Transport(crate::error::TransportError::SetupFailed(
753 format!("Failed to create dual-stack network nodes: {}", e).into(),
754 ))
755 })?,
756 );
757
758 let rate_limiter = Arc::new(RateLimiter::new(
760 crate::validation::RateLimitConfig::default(),
761 ));
762
763 let active_connections = Arc::new(RwLock::new(HashSet::new()));
765
766 let geo_provider = Arc::new(BgpGeoProvider::new());
768
769 let peers = Arc::new(RwLock::new(HashMap::new()));
771
772 let connection_monitor_handle = {
774 let active_conns = Arc::clone(&active_connections);
775 let peers_map = Arc::clone(&peers);
776 let event_tx_clone = event_tx.clone();
777 let dual_node_clone = Arc::clone(&dual_node);
778 let geo_provider_clone = Arc::clone(&geo_provider);
779 let peer_id_clone = peer_id.clone();
780
781 let handle = tokio::spawn(async move {
782 Self::connection_lifecycle_monitor(
783 dual_node_clone,
784 active_conns,
785 peers_map,
786 event_tx_clone,
787 geo_provider_clone,
788 peer_id_clone,
789 )
790 .await;
791 });
792
793 Arc::new(RwLock::new(Some(handle)))
794 };
795
796 let shutdown = Arc::new(AtomicBool::new(false));
798 let keepalive_handle = {
799 let active_conns = Arc::clone(&active_connections);
800 let dual_node_clone = Arc::clone(&dual_node);
801 let shutdown_clone = Arc::clone(&shutdown);
802
803 let handle = tokio::spawn(async move {
804 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
805 });
806
807 Arc::new(RwLock::new(Some(handle)))
808 };
809
810 let node = Self {
811 config,
812 peer_id,
813 peers,
814 event_tx,
815 listen_addrs: RwLock::new(Vec::new()),
816 start_time: Instant::now(),
817 running: RwLock::new(false),
818 dht,
819 resource_manager,
820 bootstrap_manager,
821 dual_node,
822 rate_limiter,
823 active_connections,
824 security_dashboard,
825 connection_monitor_handle,
826 keepalive_handle,
827 shutdown,
828 geo_provider,
829 };
830 info!("Created P2P node with peer ID: {}", node.peer_id);
831
832 node.start_network_listeners().await?;
834
835 node.start_connection_monitor().await;
837
838 Ok(node)
839 }
840
841 pub fn builder() -> NodeBuilder {
843 NodeBuilder::new()
844 }
845
846 pub fn peer_id(&self) -> &PeerId {
848 &self.peer_id
849 }
850
851 pub fn local_addr(&self) -> Option<String> {
852 self.listen_addrs
853 .try_read()
854 .ok()
855 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
856 }
857
858 pub async fn subscribe(&self, topic: &str) -> Result<()> {
859 info!("Subscribed to topic: {}", topic);
862 Ok(())
863 }
864
865 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
866 info!(
867 "Publishing message to topic: {} ({} bytes)",
868 topic,
869 data.len()
870 );
871
872 let peer_list: Vec<PeerId> = {
874 let peers_guard = self.peers.read().await;
875 peers_guard.keys().cloned().collect()
876 };
877
878 if peer_list.is_empty() {
879 debug!("No peers connected, message will only be sent to local subscribers");
880 } else {
881 let mut send_count = 0;
883 for peer_id in &peer_list {
884 match self.send_message(peer_id, topic, data.to_vec()).await {
885 Ok(_) => {
886 send_count += 1;
887 debug!("Sent message to peer: {}", peer_id);
888 }
889 Err(e) => {
890 warn!("Failed to send message to peer {}: {}", peer_id, e);
891 }
892 }
893 }
894 info!(
895 "Published message to {}/{} connected peers",
896 send_count,
897 peer_list.len()
898 );
899 }
900
901 let event = P2PEvent::Message {
903 topic: topic.to_string(),
904 source: self.peer_id.clone(),
905 data: data.to_vec(),
906 };
907 let _ = self.event_tx.send(event);
908
909 Ok(())
910 }
911
912 pub fn config(&self) -> &NodeConfig {
914 &self.config
915 }
916
917 pub async fn start(&self) -> Result<()> {
919 info!("Starting P2P node...");
920
921 if let Some(ref resource_manager) = self.resource_manager {
923 resource_manager.start().await.map_err(|e| {
924 P2PError::Network(crate::error::NetworkError::ProtocolError(
925 format!("Failed to start resource manager: {e}").into(),
926 ))
927 })?;
928 info!("Production resource manager started");
929 }
930
931 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
933 let mut manager = bootstrap_manager.write().await;
934 manager.start_background_tasks().await.map_err(|e| {
935 P2PError::Network(crate::error::NetworkError::ProtocolError(
936 format!("Failed to start bootstrap manager: {e}").into(),
937 ))
938 })?;
939 info!("Bootstrap cache manager started");
940 }
941
942 *self.running.write().await = true;
944
945 self.start_network_listeners().await?;
947
948 let listen_addrs = self.listen_addrs.read().await;
950 info!("P2P node started on addresses: {:?}", *listen_addrs);
951
952 self.start_message_receiving_system().await?;
956
957 self.connect_bootstrap_peers().await?;
959
960 Ok(())
961 }
962
963 async fn start_network_listeners(&self) -> Result<()> {
965 info!("Starting dual-stack listeners (ant-quic)...");
966 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
968 P2PError::Transport(crate::error::TransportError::SetupFailed(
969 format!("Failed to get local addresses: {}", e).into(),
970 ))
971 })?;
972 {
973 let mut la = self.listen_addrs.write().await;
974 *la = addrs.clone();
975 }
976
977 let event_tx = self.event_tx.clone();
979 let peers = self.peers.clone();
980 let active_connections = self.active_connections.clone();
981 let rate_limiter = self.rate_limiter.clone();
982 let dual = self.dual_node.clone();
983 tokio::spawn(async move {
984 loop {
985 match dual.accept_any().await {
986 Ok((ant_peer_id, remote_sock)) => {
987 let peer_id =
988 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
989 let remote_addr = NetworkAddress::from(remote_sock);
990 let _ = rate_limiter.check_ip(&remote_sock.ip());
992 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
993 register_new_peer(&peers, &peer_id, &remote_addr).await;
994 active_connections.write().await.insert(peer_id);
995 }
996 Err(e) => {
997 warn!("Accept failed: {}", e);
998 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
999 }
1000 }
1001 }
1002 });
1003
1004 info!("Dual-stack listeners active on: {:?}", addrs);
1005 Ok(())
1006 }
1007
1008 #[allow(dead_code)]
1010 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1011 warn!("QUIC transport temporarily disabled during ant-quic migration");
1050 Err(crate::P2PError::Transport(
1052 crate::error::TransportError::SetupFailed(
1053 format!(
1054 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1055 )
1056 .into(),
1057 ),
1058 ))
1059 }
1060
1061 #[allow(dead_code)] async fn start_connection_acceptor(
1064 &self,
1065 transport: Arc<dyn crate::transport::Transport>,
1066 addr: std::net::SocketAddr,
1067 transport_type: crate::transport::TransportType,
1068 ) -> Result<()> {
1069 info!(
1070 "Starting connection acceptor for {:?} on {}",
1071 transport_type, addr
1072 );
1073
1074 let event_tx = self.event_tx.clone();
1076 let _peer_id = self.peer_id.clone();
1077 let peers = Arc::clone(&self.peers);
1078 let rate_limiter = Arc::clone(&self.rate_limiter);
1081
1082 tokio::spawn(async move {
1084 loop {
1085 match transport.accept().await {
1086 Ok(connection) => {
1087 let remote_addr = connection.remote_addr();
1088 let connection_peer_id =
1089 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1090
1091 let socket_addr = remote_addr.socket_addr();
1093 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1094 continue;
1096 }
1097
1098 info!(
1099 "Accepted {:?} connection from {} (peer: {})",
1100 transport_type, remote_addr, connection_peer_id
1101 );
1102
1103 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1105
1106 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1108
1109 spawn_connection_handler(
1111 connection,
1112 connection_peer_id,
1113 event_tx.clone(),
1114 Arc::clone(&peers),
1115 );
1116 }
1117 Err(e) => {
1118 warn!(
1119 "Failed to accept {:?} connection on {}: {}",
1120 transport_type, addr, e
1121 );
1122
1123 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1125 }
1126 }
1127 }
1128 });
1129
1130 info!(
1131 "Connection acceptor background task started for {:?} on {}",
1132 transport_type, addr
1133 );
1134 Ok(())
1135 }
1136
1137 async fn start_message_receiving_system(&self) -> Result<()> {
1139 info!("Starting message receiving system");
1140 let dual = self.dual_node.clone();
1141 let event_tx = self.event_tx.clone();
1142
1143 tokio::spawn(async move {
1144 loop {
1145 match dual.receive_any().await {
1146 Ok((_peer_id, bytes)) => {
1147 #[allow(clippy::collapsible_if)]
1149 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1150 if let (Some(protocol), Some(data), Some(from)) = (
1151 value.get("protocol").and_then(|v| v.as_str()),
1152 value.get("data").and_then(|v| v.as_array()),
1153 value.get("from").and_then(|v| v.as_str()),
1154 ) {
1155 let payload: Vec<u8> = data
1156 .iter()
1157 .filter_map(|v| v.as_u64().map(|n| n as u8))
1158 .collect();
1159 let _ = event_tx.send(P2PEvent::Message {
1160 topic: protocol.to_string(),
1161 source: from.to_string(),
1162 data: payload,
1163 });
1164 }
1165 }
1166 }
1167 Err(e) => {
1168 warn!("Receive error: {}", e);
1169 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1170 }
1171 }
1172 }
1173 });
1174
1175 Ok(())
1176 }
1177
1178 #[allow(dead_code)]
1180 async fn handle_received_message(
1181 &self,
1182 message_data: Vec<u8>,
1183 peer_id: &PeerId,
1184 _protocol: &str,
1185 event_tx: &broadcast::Sender<P2PEvent>,
1186 ) -> Result<()> {
1187 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1191 Ok(message) => {
1192 if let (Some(protocol), Some(data), Some(from)) = (
1193 message.get("protocol").and_then(|v| v.as_str()),
1194 message.get("data").and_then(|v| v.as_array()),
1195 message.get("from").and_then(|v| v.as_str()),
1196 ) {
1197 let data_bytes: Vec<u8> = data
1199 .iter()
1200 .filter_map(|v| v.as_u64().map(|n| n as u8))
1201 .collect();
1202
1203 let event = P2PEvent::Message {
1205 topic: protocol.to_string(),
1206 source: from.to_string(),
1207 data: data_bytes,
1208 };
1209
1210 let _ = event_tx.send(event);
1211 debug!("Generated message event from peer: {}", peer_id);
1212 }
1213 }
1214 Err(e) => {
1215 warn!("Failed to parse received message from {}: {}", peer_id, e);
1216 }
1217 }
1218
1219 Ok(())
1220 }
1221
1222 pub async fn run(&self) -> Result<()> {
1228 if !*self.running.read().await {
1229 self.start().await?;
1230 }
1231
1232 info!("P2P node running...");
1233
1234 loop {
1236 if !*self.running.read().await {
1237 break;
1238 }
1239
1240 self.periodic_tasks().await?;
1242
1243 tokio::time::sleep(Duration::from_millis(100)).await;
1245 }
1246
1247 info!("P2P node stopped");
1248 Ok(())
1249 }
1250
1251 pub async fn stop(&self) -> Result<()> {
1253 info!("Stopping P2P node...");
1254
1255 *self.running.write().await = false;
1257
1258 self.disconnect_all_peers().await?;
1260
1261 if let Some(ref resource_manager) = self.resource_manager {
1263 resource_manager.shutdown().await.map_err(|e| {
1264 P2PError::Network(crate::error::NetworkError::ProtocolError(
1265 format!("Failed to shutdown resource manager: {e}").into(),
1266 ))
1267 })?;
1268 info!("Production resource manager stopped");
1269 }
1270
1271 info!("P2P node stopped");
1272 Ok(())
1273 }
1274
1275 pub async fn shutdown(&self) -> Result<()> {
1277 self.stop().await
1278 }
1279
1280 pub async fn is_running(&self) -> bool {
1282 *self.running.read().await
1283 }
1284
1285 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1287 self.listen_addrs.read().await.clone()
1288 }
1289
1290 pub async fn connected_peers(&self) -> Vec<PeerId> {
1292 self.peers.read().await.keys().cloned().collect()
1293 }
1294
1295 pub async fn peer_count(&self) -> usize {
1297 self.peers.read().await.len()
1298 }
1299
1300 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1302 self.peers.read().await.get(peer_id).cloned()
1303 }
1304
1305 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1317 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1319
1320 let peers = self.peers.read().await;
1321
1322 for (peer_id, peer_info) in peers.iter() {
1324 for peer_addr in &peer_info.addresses {
1326 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1327 && peer_socket == socket_addr
1328 {
1329 return Some(peer_id.clone());
1330 }
1331 }
1332 }
1333
1334 None
1335 }
1336
1337 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1343 let peers = self.peers.read().await;
1344
1345 peers
1346 .iter()
1347 .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1348 .collect()
1349 }
1350
1351 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1363 self.active_connections.write().await.remove(peer_id);
1365 self.peers.write().await.remove(peer_id).is_some()
1367 }
1368
1369 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1382 self.peers.read().await.contains_key(peer_id)
1383 }
1384
1385 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1387 info!("Connecting to peer at: {}", address);
1388
1389 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1391 Some(resource_manager.acquire_connection().await?)
1392 } else {
1393 None
1394 };
1395
1396 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1398 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1399 format!("{}: {}", address, e).into(),
1400 ))
1401 })?;
1402
1403 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1406 if normalized_addr != socket_addr {
1407 info!(
1408 "Normalized wildcard address {} to loopback {}",
1409 socket_addr, normalized_addr
1410 );
1411 }
1412
1413 let addr_list = vec![normalized_addr];
1415 let peer_id = match tokio::time::timeout(
1416 self.config.connection_timeout,
1417 self.dual_node.connect_happy_eyeballs(&addr_list),
1418 )
1419 .await
1420 {
1421 Ok(Ok(peer)) => {
1422 let connected_peer_id =
1423 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1424 info!("Successfully connected to peer: {}", connected_peer_id);
1425 connected_peer_id
1426 }
1427 Ok(Err(e)) => {
1428 warn!("Failed to connect to peer at {}: {}", address, e);
1429 let sanitized_address = address.replace(['/', ':'], "_");
1430 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1431 warn!(
1432 "Using demo peer ID: {} (transport connection failed)",
1433 demo_peer_id
1434 );
1435 demo_peer_id
1436 }
1437 Err(_) => {
1438 warn!(
1439 "Timed out connecting to peer at {} after {:?}",
1440 address, self.config.connection_timeout
1441 );
1442 let sanitized_address = address.replace(['/', ':'], "_");
1443 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1444 demo_peer_id
1445 }
1446 };
1447
1448 let peer_info = PeerInfo {
1450 peer_id: peer_id.clone(),
1451 addresses: vec![address.to_string()],
1452 connected_at: Instant::now(),
1453 last_seen: Instant::now(),
1454 status: ConnectionStatus::Connected,
1455 protocols: vec!["p2p-foundation/1.0".to_string()],
1456 heartbeat_count: 0,
1457 };
1458
1459 self.peers.write().await.insert(peer_id.clone(), peer_info);
1461
1462 self.active_connections
1465 .write()
1466 .await
1467 .insert(peer_id.clone());
1468
1469 if let Some(ref resource_manager) = self.resource_manager {
1471 resource_manager.record_bandwidth(0, 0); }
1473
1474 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1476
1477 info!("Connected to peer: {}", peer_id);
1478 Ok(peer_id)
1479 }
1480
1481 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1483 info!("Disconnecting from peer: {}", peer_id);
1484
1485 self.active_connections.write().await.remove(peer_id);
1487
1488 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1489 peer_info.status = ConnectionStatus::Disconnected;
1490
1491 let _ = self
1493 .event_tx
1494 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1495
1496 info!("Disconnected from peer: {}", peer_id);
1497 }
1498
1499 Ok(())
1500 }
1501
1502 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1504 self.active_connections.read().await.contains(peer_id)
1505 }
1506
1507 pub async fn send_message(
1509 &self,
1510 peer_id: &PeerId,
1511 protocol: &str,
1512 data: Vec<u8>,
1513 ) -> Result<()> {
1514 debug!(
1515 "Sending message to peer {} on protocol {}",
1516 peer_id, protocol
1517 );
1518
1519 if let Some(ref resource_manager) = self.resource_manager
1521 && !resource_manager
1522 .check_rate_limit(peer_id, "message")
1523 .await?
1524 {
1525 return Err(P2PError::ResourceExhausted(
1526 format!("Rate limit exceeded for peer {}", peer_id).into(),
1527 ));
1528 }
1529
1530 if !self.peers.read().await.contains_key(peer_id) {
1532 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1533 peer_id.to_string().into(),
1534 )));
1535 }
1536
1537 if !self.is_connection_active(peer_id).await {
1540 debug!(
1541 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1542 peer_id
1543 );
1544
1545 self.remove_peer(peer_id).await;
1547
1548 return Err(P2PError::Network(
1549 crate::error::NetworkError::ConnectionClosed {
1550 peer_id: peer_id.to_string().into(),
1551 },
1552 ));
1553 }
1554
1555 if let Some(ref resource_manager) = self.resource_manager {
1559 resource_manager.record_bandwidth(data.len() as u64, 0);
1560 }
1561
1562 let _message_data = self.create_protocol_message(protocol, data)?;
1564
1565 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1567 tokio::time::timeout(self.config.connection_timeout, send_fut)
1568 .await
1569 .map_err(|_| {
1570 P2PError::Transport(crate::error::TransportError::StreamError(
1571 "Timed out sending message".into(),
1572 ))
1573 })?
1574 .map_err(|e| {
1575 P2PError::Transport(crate::error::TransportError::StreamError(
1576 e.to_string().into(),
1577 ))
1578 })
1579 }
1580
1581 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1583 use serde_json::json;
1584
1585 let timestamp = std::time::SystemTime::now()
1586 .duration_since(std::time::UNIX_EPOCH)
1587 .map_err(|e| {
1588 P2PError::Network(NetworkError::ProtocolError(
1589 format!("System time error: {}", e).into(),
1590 ))
1591 })?
1592 .as_secs();
1593
1594 let message = json!({
1596 "protocol": protocol,
1597 "data": data,
1598 "from": self.peer_id,
1599 "timestamp": timestamp
1600 });
1601
1602 serde_json::to_vec(&message).map_err(|e| {
1603 P2PError::Transport(crate::error::TransportError::StreamError(
1604 format!("Failed to serialize message: {e}").into(),
1605 ))
1606 })
1607 }
1608
1609 }
1611
1612#[allow(dead_code)]
1614fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1615 use serde_json::json;
1616
1617 let timestamp = std::time::SystemTime::now()
1618 .duration_since(std::time::UNIX_EPOCH)
1619 .map_err(|e| {
1620 P2PError::Network(NetworkError::ProtocolError(
1621 format!("System time error: {}", e).into(),
1622 ))
1623 })?
1624 .as_secs();
1625
1626 let message = json!({
1628 "protocol": protocol,
1629 "data": data,
1630 "timestamp": timestamp
1631 });
1632
1633 serde_json::to_vec(&message).map_err(|e| {
1634 P2PError::Transport(crate::error::TransportError::StreamError(
1635 format!("Failed to serialize message: {e}").into(),
1636 ))
1637 })
1638}
1639
1640impl P2PNode {
1641 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1643 self.event_tx.subscribe()
1644 }
1645
1646 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1648 self.subscribe_events()
1649 }
1650
1651 pub fn uptime(&self) -> Duration {
1653 self.start_time.elapsed()
1654 }
1655
1656 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1666 if let Some(ref resource_manager) = self.resource_manager {
1667 Ok(resource_manager.get_metrics().await)
1668 } else {
1669 Err(P2PError::Network(
1670 crate::error::NetworkError::ProtocolError(
1671 "Production resource manager not enabled".to_string().into(),
1672 ),
1673 ))
1674 }
1675 }
1676
1677 async fn connection_lifecycle_monitor(
1680 dual_node: Arc<DualStackNetworkNode>,
1681 active_connections: Arc<RwLock<HashSet<String>>>,
1682 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1683 event_tx: broadcast::Sender<P2PEvent>,
1684 geo_provider: Arc<BgpGeoProvider>,
1685 local_peer_id: String,
1686 ) {
1687 use crate::transport::ant_quic_adapter::ConnectionEvent;
1688
1689 let mut event_rx = dual_node.subscribe_connection_events();
1690
1691 info!("Connection lifecycle monitor started");
1692
1693 loop {
1694 match event_rx.recv().await {
1695 Ok(event) => {
1696 match event {
1697 ConnectionEvent::Established {
1698 peer_id,
1699 remote_address,
1700 } => {
1701 let peer_id_str =
1702 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1703 debug!(
1704 "Connection established: peer={}, addr={}",
1705 peer_id_str, remote_address
1706 );
1707
1708 let ip = remote_address.ip();
1711 let is_rejected = match ip {
1712 std::net::IpAddr::V4(v4) => {
1713 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1715 geo_provider.is_hosting_asn(asn)
1716 || geo_provider.is_vpn_asn(asn)
1717 } else {
1718 false
1719 }
1720 }
1721 std::net::IpAddr::V6(v6) => {
1722 let info = geo_provider.lookup(v6);
1723 info.is_hosting_provider || info.is_vpn_provider
1724 }
1725 };
1726
1727 if is_rejected {
1728 info!(
1729 "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1730 peer_id_str, remote_address
1731 );
1732
1733 let rejection = RejectionMessage {
1735 reason: RejectionReason::GeoIpPolicy,
1736 message:
1737 "Connection rejected: Hosting/VPN providers not allowed"
1738 .to_string(),
1739 suggested_target: None, };
1741
1742 if let Ok(data) = serde_json::to_vec(&rejection) {
1744 let timestamp = std::time::SystemTime::now()
1746 .duration_since(std::time::UNIX_EPOCH)
1747 .unwrap_or_default()
1748 .as_secs();
1749
1750 let message = serde_json::json!({
1751 "protocol": "control",
1752 "data": data,
1753 "from": local_peer_id,
1754 "timestamp": timestamp
1755 });
1756
1757 if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1758 let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1762
1763 tokio::task::yield_now().await;
1766 }
1767 }
1768
1769 continue;
1773 }
1774
1775 active_connections.write().await.insert(peer_id_str.clone());
1777
1778 let mut peers_lock = peers.write().await;
1780 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1781 peer_info.status = ConnectionStatus::Connected;
1782 peer_info.connected_at = Instant::now();
1783 } else {
1784 debug!("Registering new incoming peer: {}", peer_id_str);
1786 peers_lock.insert(
1787 peer_id_str.clone(),
1788 PeerInfo {
1789 peer_id: peer_id_str.clone(),
1790 addresses: vec![remote_address.to_string()],
1791 status: ConnectionStatus::Connected,
1792 last_seen: Instant::now(),
1793 connected_at: Instant::now(),
1794 protocols: Vec::new(),
1795 heartbeat_count: 0,
1796 },
1797 );
1798 }
1799
1800 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1802 }
1803 ConnectionEvent::Lost { peer_id, reason } => {
1804 let peer_id_str =
1805 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1806 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1807
1808 active_connections.write().await.remove(&peer_id_str);
1810
1811 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1813 peer_info.status = ConnectionStatus::Disconnected;
1814 peer_info.last_seen = Instant::now();
1815 }
1816
1817 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1819 }
1820 ConnectionEvent::Failed { peer_id, reason } => {
1821 let peer_id_str =
1822 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1823 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1824
1825 active_connections.write().await.remove(&peer_id_str);
1827
1828 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1830 peer_info.status = ConnectionStatus::Failed(reason.clone());
1831 }
1832
1833 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1835 }
1836 }
1837 }
1838 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1839 warn!(
1840 "Connection event monitor lagged, skipped {} events",
1841 skipped
1842 );
1843 continue;
1844 }
1845 Err(broadcast::error::RecvError::Closed) => {
1846 info!("Connection event channel closed, stopping monitor");
1847 break;
1848 }
1849 }
1850 }
1851
1852 info!("Connection lifecycle monitor stopped");
1853 }
1854
1855 async fn start_connection_monitor(&self) {
1857 debug!("Connection monitor already running from initialization");
1861 }
1862
1863 async fn keepalive_task(
1869 active_connections: Arc<RwLock<HashSet<String>>>,
1870 dual_node: Arc<DualStackNetworkNode>,
1871 shutdown: Arc<AtomicBool>,
1872 ) {
1873 use tokio::time::{Duration, interval};
1874
1875 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1879 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1880
1881 info!(
1882 "Keepalive task started (interval: {}s)",
1883 KEEPALIVE_INTERVAL_SECS
1884 );
1885
1886 loop {
1887 if shutdown.load(Ordering::Relaxed) {
1889 info!("Keepalive task shutting down");
1890 break;
1891 }
1892
1893 interval.tick().await;
1894
1895 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1897
1898 if peers.is_empty() {
1899 trace!("Keepalive: no active connections");
1900 continue;
1901 }
1902
1903 debug!("Sending keepalive to {} active connections", peers.len());
1904
1905 for peer_id in peers {
1907 match dual_node
1908 .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1909 .await
1910 {
1911 Ok(_) => {
1912 trace!("Keepalive sent to peer: {}", peer_id);
1913 }
1914 Err(e) => {
1915 debug!(
1916 "Failed to send keepalive to peer {}: {} (connection may have closed)",
1917 peer_id, e
1918 );
1919 }
1921 }
1922 }
1923 }
1924
1925 info!("Keepalive task stopped");
1926 }
1927
1928 pub async fn health_check(&self) -> Result<()> {
1930 if let Some(ref resource_manager) = self.resource_manager {
1931 resource_manager.health_check().await
1932 } else {
1933 let peer_count = self.peer_count().await;
1935 if peer_count > self.config.max_connections {
1936 Err(P2PError::Network(
1937 crate::error::NetworkError::ProtocolError(
1938 format!("Too many connections: {peer_count}").into(),
1939 ),
1940 ))
1941 } else {
1942 Ok(())
1943 }
1944 }
1945 }
1946
1947 pub fn production_config(&self) -> Option<&ProductionConfig> {
1949 self.config.production_config.as_ref()
1950 }
1951
1952 pub fn is_production_mode(&self) -> bool {
1954 self.resource_manager.is_some()
1955 }
1956
1957 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1959 self.dht.as_ref()
1960 }
1961
1962 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1964 if let Some(ref dht) = self.dht {
1965 let mut dht_instance = dht.write().await;
1966 let dht_key = crate::dht::DhtKey::from_bytes(key);
1967 dht_instance
1968 .store(&dht_key, value.clone())
1969 .await
1970 .map_err(|e| {
1971 P2PError::Dht(crate::error::DhtError::StoreFailed(
1972 format!("{:?}: {e}", key).into(),
1973 ))
1974 })?;
1975
1976 Ok(())
1977 } else {
1978 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1979 "DHT not enabled".to_string().into(),
1980 )))
1981 }
1982 }
1983
1984 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1986 if let Some(ref dht) = self.dht {
1987 let dht_instance = dht.read().await;
1988 let dht_key = crate::dht::DhtKey::from_bytes(key);
1989 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1990 P2PError::Dht(crate::error::DhtError::StoreFailed(
1991 format!("Retrieve failed: {e}").into(),
1992 ))
1993 })?;
1994
1995 Ok(record_result)
1996 } else {
1997 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1998 "DHT not enabled".to_string().into(),
1999 )))
2000 }
2001 }
2002
2003 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2005 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2006 let mut manager = bootstrap_manager.write().await;
2007 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2008 .iter()
2009 .filter_map(|addr| addr.parse().ok())
2010 .collect();
2011 let contact = ContactEntry::new(peer_id, socket_addresses);
2012 manager.add_contact(contact).await.map_err(|e| {
2013 P2PError::Network(crate::error::NetworkError::ProtocolError(
2014 format!("Failed to add peer to bootstrap cache: {e}").into(),
2015 ))
2016 })?;
2017 }
2018 Ok(())
2019 }
2020
2021 pub async fn update_peer_metrics(
2023 &self,
2024 peer_id: &PeerId,
2025 success: bool,
2026 latency_ms: Option<u64>,
2027 _error: Option<String>,
2028 ) -> Result<()> {
2029 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2030 let mut manager = bootstrap_manager.write().await;
2031
2032 let metrics = QualityMetrics {
2034 success_rate: if success { 1.0 } else { 0.0 },
2035 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2036 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2038 last_successful_connection: if success {
2039 chrono::Utc::now()
2040 } else {
2041 chrono::Utc::now() - chrono::Duration::hours(1)
2042 },
2043 uptime_score: 0.5,
2044 };
2045
2046 manager
2047 .update_contact_metrics(peer_id, metrics)
2048 .await
2049 .map_err(|e| {
2050 P2PError::Network(crate::error::NetworkError::ProtocolError(
2051 format!("Failed to update peer metrics: {e}").into(),
2052 ))
2053 })?;
2054 }
2055 Ok(())
2056 }
2057
2058 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2060 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2061 let manager = bootstrap_manager.read().await;
2062 let stats = manager.get_stats().await.map_err(|e| {
2063 P2PError::Network(crate::error::NetworkError::ProtocolError(
2064 format!("Failed to get bootstrap stats: {e}").into(),
2065 ))
2066 })?;
2067 Ok(Some(stats))
2068 } else {
2069 Ok(None)
2070 }
2071 }
2072
2073 pub async fn cached_peer_count(&self) -> usize {
2075 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2076 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2077 {
2078 return stats.total_contacts;
2079 }
2080 0
2081 }
2082
2083 async fn connect_bootstrap_peers(&self) -> Result<()> {
2085 let mut bootstrap_contacts = Vec::new();
2086 let mut used_cache = false;
2087 let mut seen_addresses = std::collections::HashSet::new();
2088
2089 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2091 self.config.bootstrap_peers_str.clone()
2092 } else {
2093 self.config
2095 .bootstrap_peers
2096 .iter()
2097 .map(|addr| addr.to_string())
2098 .collect::<Vec<_>>()
2099 };
2100
2101 if !cli_bootstrap_peers.is_empty() {
2102 info!(
2103 "Using {} CLI-provided bootstrap peers (priority)",
2104 cli_bootstrap_peers.len()
2105 );
2106 for addr in &cli_bootstrap_peers {
2107 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2108 seen_addresses.insert(socket_addr);
2109 let contact = ContactEntry::new(
2110 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2111 vec![socket_addr],
2112 );
2113 bootstrap_contacts.push(contact);
2114 } else {
2115 warn!("Invalid bootstrap address format: {}", addr);
2116 }
2117 }
2118 }
2119
2120 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2122 let manager = bootstrap_manager.read().await;
2123 match manager.get_bootstrap_peers(20).await {
2124 Ok(contacts) => {
2126 if !contacts.is_empty() {
2127 let mut added_from_cache = 0;
2128 for contact in contacts {
2129 let new_addresses: Vec<_> = contact
2131 .addresses
2132 .iter()
2133 .filter(|addr| !seen_addresses.contains(addr))
2134 .copied()
2135 .collect();
2136
2137 if !new_addresses.is_empty() {
2138 for addr in &new_addresses {
2139 seen_addresses.insert(*addr);
2140 }
2141 let mut contact = contact.clone();
2142 contact.addresses = new_addresses;
2143 bootstrap_contacts.push(contact);
2144 added_from_cache += 1;
2145 }
2146 }
2147 if added_from_cache > 0 {
2148 info!(
2149 "Added {} cached bootstrap peers (supplementing CLI peers)",
2150 added_from_cache
2151 );
2152 used_cache = true;
2153 }
2154 }
2155 }
2156 Err(e) => {
2157 warn!("Failed to get cached bootstrap peers: {}", e);
2158 }
2159 }
2160 }
2161
2162 if bootstrap_contacts.is_empty() {
2163 info!("No bootstrap peers configured and no cached peers available");
2164 return Ok(());
2165 }
2166
2167 let mut successful_connections = 0;
2169 for contact in bootstrap_contacts {
2170 for addr in &contact.addresses {
2171 match self.connect_peer(&addr.to_string()).await {
2172 Ok(peer_id) => {
2173 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2174 successful_connections += 1;
2175
2176 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2178 let mut manager = bootstrap_manager.write().await;
2179 let mut updated_contact = contact.clone();
2180 updated_contact.peer_id = peer_id.clone();
2181 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2184 warn!("Failed to update bootstrap cache: {}", e);
2185 }
2186 }
2187 break; }
2189 Err(e) => {
2190 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2191
2192 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2194 let mut manager = bootstrap_manager.write().await;
2195 let mut updated_contact = contact.clone();
2196 updated_contact.update_connection_result(
2197 false,
2198 None,
2199 Some(e.to_string()),
2200 );
2201
2202 if let Err(e) = manager.add_contact(updated_contact).await {
2203 warn!("Failed to update bootstrap cache: {}", e);
2204 }
2205 }
2206 }
2207 }
2208 }
2209 }
2210
2211 if successful_connections == 0 {
2212 if !used_cache {
2213 warn!("Failed to connect to any bootstrap peers");
2214 }
2215 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2216 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2218 }));
2219 }
2220 info!(
2221 "Successfully connected to {} bootstrap peers",
2222 successful_connections
2223 );
2224
2225 Ok(())
2226 }
2227
2228 async fn disconnect_all_peers(&self) -> Result<()> {
2230 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2231
2232 for peer_id in peer_ids {
2233 self.disconnect_peer(&peer_id).await?;
2234 }
2235
2236 Ok(())
2237 }
2238
2239 async fn periodic_tasks(&self) -> Result<()> {
2241 Ok(())
2247 }
2248}
2249
2250#[async_trait::async_trait]
2252pub trait NetworkSender: Send + Sync {
2253 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2255
2256 fn local_peer_id(&self) -> &PeerId;
2258}
2259
2260#[derive(Clone)]
2262pub struct P2PNetworkSender {
2263 peer_id: PeerId,
2264 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2266}
2267
2268impl P2PNetworkSender {
2269 pub fn new(
2270 peer_id: PeerId,
2271 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2272 ) -> Self {
2273 Self { peer_id, send_tx }
2274 }
2275}
2276
2277#[async_trait::async_trait]
2279impl NetworkSender for P2PNetworkSender {
2280 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2282 self.send_tx
2283 .send((peer_id.clone(), protocol.to_string(), data))
2284 .map_err(|_| {
2285 P2PError::Network(crate::error::NetworkError::ProtocolError(
2286 "Failed to send message via channel".to_string().into(),
2287 ))
2288 })?;
2289 Ok(())
2290 }
2291
2292 fn local_peer_id(&self) -> &PeerId {
2294 &self.peer_id
2295 }
2296}
2297
2298pub struct NodeBuilder {
2300 config: NodeConfig,
2301}
2302
2303impl Default for NodeBuilder {
2304 fn default() -> Self {
2305 Self::new()
2306 }
2307}
2308
2309impl NodeBuilder {
2310 pub fn new() -> Self {
2312 Self {
2313 config: NodeConfig::default(),
2314 }
2315 }
2316
2317 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2319 self.config.peer_id = Some(peer_id);
2320 self
2321 }
2322
2323 pub fn listen_on(mut self, addr: &str) -> Self {
2325 if let Ok(multiaddr) = addr.parse() {
2326 self.config.listen_addrs.push(multiaddr);
2327 }
2328 self
2329 }
2330
2331 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2333 if let Ok(multiaddr) = addr.parse() {
2334 self.config.bootstrap_peers.push(multiaddr);
2335 }
2336 self.config.bootstrap_peers_str.push(addr.to_string());
2337 self
2338 }
2339
2340 pub fn with_ipv6(mut self, enable: bool) -> Self {
2342 self.config.enable_ipv6 = enable;
2343 self
2344 }
2345
2346 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2350 self.config.connection_timeout = timeout;
2351 self
2352 }
2353
2354 pub fn with_max_connections(mut self, max: usize) -> Self {
2356 self.config.max_connections = max;
2357 self
2358 }
2359
2360 pub fn with_production_mode(mut self) -> Self {
2362 self.config.production_config = Some(ProductionConfig::default());
2363 self
2364 }
2365
2366 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2368 self.config.production_config = Some(production_config);
2369 self
2370 }
2371
2372 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2374 self.config.dht_config = dht_config;
2375 self
2376 }
2377
2378 pub fn with_default_dht(mut self) -> Self {
2380 self.config.dht_config = DHTConfig::default();
2381 self
2382 }
2383
2384 pub async fn build(self) -> Result<P2PNode> {
2386 P2PNode::new(self.config).await
2387 }
2388}
2389
2390#[allow(dead_code)] async fn handle_received_message_standalone(
2393 message_data: Vec<u8>,
2394 peer_id: &PeerId,
2395 _protocol: &str,
2396 event_tx: &broadcast::Sender<P2PEvent>,
2397) -> Result<()> {
2398 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2400 Ok(message) => {
2401 if let (Some(protocol), Some(data), Some(from)) = (
2402 message.get("protocol").and_then(|v| v.as_str()),
2403 message.get("data").and_then(|v| v.as_array()),
2404 message.get("from").and_then(|v| v.as_str()),
2405 ) {
2406 let data_bytes: Vec<u8> = data
2408 .iter()
2409 .filter_map(|v| v.as_u64().map(|n| n as u8))
2410 .collect();
2411
2412 let event = P2PEvent::Message {
2414 topic: protocol.to_string(),
2415 source: from.to_string(),
2416 data: data_bytes,
2417 };
2418
2419 let _ = event_tx.send(event);
2420 debug!("Generated message event from peer: {}", peer_id);
2421 }
2422 }
2423 Err(e) => {
2424 warn!("Failed to parse received message from {}: {}", peer_id, e);
2425 }
2426 }
2427
2428 Ok(())
2429}
2430
2431#[allow(dead_code)]
2435fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2436 match create_protocol_message_static(protocol, data) {
2437 Ok(msg) => Some(msg),
2438 Err(e) => {
2439 warn!("Failed to create protocol message: {}", e);
2440 None
2441 }
2442 }
2443}
2444
2445#[allow(dead_code)]
2447async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2448 match result {
2449 Ok(_) => {
2450 debug!("Message sent to peer {} via transport layer", peer_id);
2451 }
2452 Err(e) => {
2453 warn!("Failed to send message to peer {}: {}", peer_id, e);
2454 }
2455 }
2456}
2457
2458#[allow(dead_code)] fn check_rate_limit(
2461 rate_limiter: &RateLimiter,
2462 socket_addr: &std::net::SocketAddr,
2463 remote_addr: &NetworkAddress,
2464) -> Result<()> {
2465 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2466 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2467 e
2468 })
2469}
2470
2471#[allow(dead_code)] async fn register_new_peer(
2474 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2475 peer_id: &PeerId,
2476 remote_addr: &NetworkAddress,
2477) {
2478 let mut peers_guard = peers.write().await;
2479 let peer_info = PeerInfo {
2480 peer_id: peer_id.clone(),
2481 addresses: vec![remote_addr.to_string()],
2482 connected_at: tokio::time::Instant::now(),
2483 last_seen: tokio::time::Instant::now(),
2484 status: ConnectionStatus::Connected,
2485 protocols: vec!["p2p-chat/1.0.0".to_string()],
2486 heartbeat_count: 0,
2487 };
2488 peers_guard.insert(peer_id.clone(), peer_info);
2489}
2490
2491#[allow(dead_code)] fn spawn_connection_handler(
2494 connection: Box<dyn crate::transport::Connection>,
2495 peer_id: PeerId,
2496 event_tx: broadcast::Sender<P2PEvent>,
2497 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2498) {
2499 tokio::spawn(async move {
2500 handle_peer_connection(connection, peer_id, event_tx, peers).await;
2501 });
2502}
2503
2504#[allow(dead_code)] async fn handle_peer_connection(
2507 mut connection: Box<dyn crate::transport::Connection>,
2508 peer_id: PeerId,
2509 event_tx: broadcast::Sender<P2PEvent>,
2510 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2511) {
2512 loop {
2513 match connection.receive().await {
2514 Ok(message_data) => {
2515 debug!(
2516 "Received {} bytes from peer: {}",
2517 message_data.len(),
2518 peer_id
2519 );
2520
2521 if let Err(e) = handle_received_message_standalone(
2523 message_data,
2524 &peer_id,
2525 "unknown", &event_tx,
2527 )
2528 .await
2529 {
2530 warn!("Failed to handle message from peer {}: {}", peer_id, e);
2531 }
2532 }
2533 Err(e) => {
2534 warn!("Failed to receive message from {}: {}", peer_id, e);
2535
2536 if !connection.is_alive().await {
2538 info!("Connection to {} is dead, removing peer", peer_id);
2539
2540 remove_peer(&peers, &peer_id).await;
2542
2543 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2545
2546 break; }
2548
2549 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2551 }
2552 }
2553 }
2554}
2555
2556#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2559 let mut peers_guard = peers.write().await;
2560 peers_guard.remove(peer_id);
2561}
2562
2563#[allow(dead_code)]
2565async fn update_peer_heartbeat(
2566 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2567 peer_id: &PeerId,
2568) -> Result<()> {
2569 let mut peers_guard = peers.write().await;
2570 match peers_guard.get_mut(peer_id) {
2571 Some(peer_info) => {
2572 peer_info.last_seen = Instant::now();
2573 peer_info.heartbeat_count += 1;
2574 Ok(())
2575 }
2576 None => {
2577 warn!("Received heartbeat from unknown peer: {}", peer_id);
2578 Err(P2PError::Network(NetworkError::PeerNotFound(
2579 format!("Peer {} not found", peer_id).into(),
2580 )))
2581 }
2582 }
2583}
2584
2585#[allow(dead_code)]
2587async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2588 if let Some(manager) = resource_manager {
2589 let metrics = manager.get_metrics().await;
2590 (metrics.memory_used, metrics.cpu_usage)
2591 } else {
2592 (0, 0.0)
2593 }
2594}
2595
2596#[cfg(test)]
2597mod tests {
2598 use super::*;
2599 use std::time::Duration;
2601 use tokio::time::timeout;
2602
2603 fn create_test_node_config() -> NodeConfig {
2609 NodeConfig {
2610 peer_id: Some("test_peer_123".to_string()),
2611 listen_addrs: vec![
2612 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2613 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2614 ],
2615 listen_addr: std::net::SocketAddr::new(
2616 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2617 0,
2618 ),
2619 bootstrap_peers: vec![],
2620 bootstrap_peers_str: vec![],
2621 enable_ipv6: true,
2622
2623 connection_timeout: Duration::from_millis(300),
2624 keep_alive_interval: Duration::from_secs(30),
2625 max_connections: 100,
2626 max_incoming_connections: 50,
2627 dht_config: DHTConfig::default(),
2628 security_config: SecurityConfig::default(),
2629 production_config: None,
2630 bootstrap_cache_config: None,
2631 }
2633 }
2634
2635 #[tokio::test]
2639 async fn test_node_config_default() {
2640 let config = NodeConfig::default();
2641
2642 assert!(config.peer_id.is_none());
2643 assert_eq!(config.listen_addrs.len(), 2);
2644 assert!(config.enable_ipv6);
2645 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2647 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2648 }
2649
2650 #[tokio::test]
2651 async fn test_dht_config_default() {
2652 let config = DHTConfig::default();
2653
2654 assert_eq!(config.k_value, 20);
2655 assert_eq!(config.alpha_value, 5);
2656 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2657 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2658 }
2659
2660 #[tokio::test]
2661 async fn test_security_config_default() {
2662 let config = SecurityConfig::default();
2663
2664 assert!(config.enable_noise);
2665 assert!(config.enable_tls);
2666 assert_eq!(config.trust_level, TrustLevel::Basic);
2667 }
2668
2669 #[test]
2670 fn test_trust_level_variants() {
2671 let _none = TrustLevel::None;
2673 let _basic = TrustLevel::Basic;
2674 let _full = TrustLevel::Full;
2675
2676 assert_eq!(TrustLevel::None, TrustLevel::None);
2678 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2679 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2680 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2681 }
2682
2683 #[test]
2684 fn test_connection_status_variants() {
2685 let connecting = ConnectionStatus::Connecting;
2686 let connected = ConnectionStatus::Connected;
2687 let disconnecting = ConnectionStatus::Disconnecting;
2688 let disconnected = ConnectionStatus::Disconnected;
2689 let failed = ConnectionStatus::Failed("test error".to_string());
2690
2691 assert_eq!(connecting, ConnectionStatus::Connecting);
2692 assert_eq!(connected, ConnectionStatus::Connected);
2693 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2694 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2695 assert_ne!(connecting, connected);
2696
2697 if let ConnectionStatus::Failed(msg) = failed {
2698 assert_eq!(msg, "test error");
2699 } else {
2700 panic!("Expected Failed status");
2701 }
2702 }
2703
2704 #[tokio::test]
2705 async fn test_node_creation() -> Result<()> {
2706 let config = create_test_node_config();
2707 let node = P2PNode::new(config).await?;
2708
2709 assert_eq!(node.peer_id(), "test_peer_123");
2710 assert!(!node.is_running().await);
2711 assert_eq!(node.peer_count().await, 0);
2712 assert!(node.connected_peers().await.is_empty());
2713
2714 Ok(())
2715 }
2716
2717 #[tokio::test]
2718 async fn test_node_creation_without_peer_id() -> Result<()> {
2719 let mut config = create_test_node_config();
2720 config.peer_id = None;
2721
2722 let node = P2PNode::new(config).await?;
2723
2724 assert!(node.peer_id().starts_with("peer_"));
2726 assert!(!node.is_running().await);
2727
2728 Ok(())
2729 }
2730
2731 #[tokio::test]
2732 async fn test_node_lifecycle() -> Result<()> {
2733 let config = create_test_node_config();
2734 let node = P2PNode::new(config).await?;
2735
2736 assert!(!node.is_running().await);
2738
2739 node.start().await?;
2741 assert!(node.is_running().await);
2742
2743 let listen_addrs = node.listen_addrs().await;
2745 assert!(
2746 !listen_addrs.is_empty(),
2747 "Expected at least one listening address"
2748 );
2749
2750 node.stop().await?;
2752 assert!(!node.is_running().await);
2753
2754 Ok(())
2755 }
2756
2757 #[tokio::test]
2758 async fn test_peer_connection() -> Result<()> {
2759 let config = create_test_node_config();
2760 let node = P2PNode::new(config).await?;
2761
2762 let peer_addr = "127.0.0.1:0";
2763
2764 let peer_id = node.connect_peer(peer_addr).await?;
2766 assert!(peer_id.starts_with("peer_from_"));
2767
2768 assert_eq!(node.peer_count().await, 1);
2770
2771 let connected_peers = node.connected_peers().await;
2773 assert_eq!(connected_peers.len(), 1);
2774 assert_eq!(connected_peers[0], peer_id);
2775
2776 let peer_info = node.peer_info(&peer_id).await;
2778 assert!(peer_info.is_some());
2779 let info = peer_info.expect("Peer info should exist after adding peer");
2780 assert_eq!(info.peer_id, peer_id);
2781 assert_eq!(info.status, ConnectionStatus::Connected);
2782 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2783
2784 node.disconnect_peer(&peer_id).await?;
2786 assert_eq!(node.peer_count().await, 0);
2787
2788 Ok(())
2789 }
2790
2791 #[tokio::test]
2792 async fn test_event_subscription() -> Result<()> {
2793 let config = create_test_node_config();
2794 let node = P2PNode::new(config).await?;
2795
2796 let mut events = node.subscribe_events();
2797 let peer_addr = "127.0.0.1:0";
2798
2799 let peer_id = node.connect_peer(peer_addr).await?;
2801
2802 let event = timeout(Duration::from_millis(100), events.recv()).await;
2804 assert!(event.is_ok());
2805
2806 let event_result = event
2807 .expect("Should receive event")
2808 .expect("Event should not be error");
2809 match event_result {
2810 P2PEvent::PeerConnected(event_peer_id) => {
2811 assert_eq!(event_peer_id, peer_id);
2812 }
2813 _ => panic!("Expected PeerConnected event"),
2814 }
2815
2816 node.disconnect_peer(&peer_id).await?;
2818
2819 let event = timeout(Duration::from_millis(100), events.recv()).await;
2821 assert!(event.is_ok());
2822
2823 let event_result = event
2824 .expect("Should receive event")
2825 .expect("Event should not be error");
2826 match event_result {
2827 P2PEvent::PeerDisconnected(event_peer_id) => {
2828 assert_eq!(event_peer_id, peer_id);
2829 }
2830 _ => panic!("Expected PeerDisconnected event"),
2831 }
2832
2833 Ok(())
2834 }
2835
2836 #[tokio::test]
2837 async fn test_message_sending() -> Result<()> {
2838 let mut config1 = create_test_node_config();
2840 config1.listen_addr =
2841 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2842 let node1 = P2PNode::new(config1).await?;
2843 node1.start().await?;
2844
2845 let mut config2 = create_test_node_config();
2846 config2.listen_addr =
2847 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2848 let node2 = P2PNode::new(config2).await?;
2849 node2.start().await?;
2850
2851 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2853
2854 let node2_addr = node2.local_addr().ok_or_else(|| {
2856 P2PError::Network(crate::error::NetworkError::ProtocolError(
2857 "No listening address".to_string().into(),
2858 ))
2859 })?;
2860
2861 let peer_id =
2863 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2864 Ok(res) => res?,
2865 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2866 };
2867
2868 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2870
2871 let message_data = b"Hello, peer!".to_vec();
2873 let result = match timeout(
2874 Duration::from_millis(500),
2875 node1.send_message(&peer_id, "test-protocol", message_data),
2876 )
2877 .await
2878 {
2879 Ok(res) => res,
2880 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2881 };
2882 if let Err(e) = &result {
2885 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2886 }
2887
2888 let non_existent_peer = "non_existent_peer".to_string();
2890 let result = node1
2891 .send_message(&non_existent_peer, "test-protocol", vec![])
2892 .await;
2893 assert!(result.is_err(), "Sending to non-existent peer should fail");
2894
2895 Ok(())
2896 }
2897
2898 #[tokio::test]
2899 async fn test_remote_mcp_operations() -> Result<()> {
2900 let config = create_test_node_config();
2901 let node = P2PNode::new(config).await?;
2902
2903 node.start().await?;
2905 node.stop().await?;
2906 Ok(())
2907 }
2908
2909 #[tokio::test]
2910 async fn test_health_check() -> Result<()> {
2911 let config = create_test_node_config();
2912 let node = P2PNode::new(config).await?;
2913
2914 let result = node.health_check().await;
2916 assert!(result.is_ok());
2917
2918 Ok(())
2923 }
2924
2925 #[tokio::test]
2926 async fn test_node_uptime() -> Result<()> {
2927 let config = create_test_node_config();
2928 let node = P2PNode::new(config).await?;
2929
2930 let uptime1 = node.uptime();
2931 assert!(uptime1 >= Duration::from_secs(0));
2932
2933 tokio::time::sleep(Duration::from_millis(10)).await;
2935
2936 let uptime2 = node.uptime();
2937 assert!(uptime2 > uptime1);
2938
2939 Ok(())
2940 }
2941
2942 #[tokio::test]
2943 async fn test_node_config_access() -> Result<()> {
2944 let config = create_test_node_config();
2945 let expected_peer_id = config.peer_id.clone();
2946 let node = P2PNode::new(config).await?;
2947
2948 let node_config = node.config();
2949 assert_eq!(node_config.peer_id, expected_peer_id);
2950 assert_eq!(node_config.max_connections, 100);
2951 Ok(())
2954 }
2955
2956 #[tokio::test]
2957 async fn test_mcp_server_access() -> Result<()> {
2958 let config = create_test_node_config();
2959 let _node = P2PNode::new(config).await?;
2960
2961 Ok(())
2963 }
2964
2965 #[tokio::test]
2966 async fn test_dht_access() -> Result<()> {
2967 let config = create_test_node_config();
2968 let node = P2PNode::new(config).await?;
2969
2970 assert!(node.dht().is_some());
2972
2973 Ok(())
2974 }
2975
2976 #[tokio::test]
2977 async fn test_node_builder() -> Result<()> {
2978 let builder = P2PNode::builder()
2980 .with_peer_id("builder_test_peer".to_string())
2981 .listen_on("/ip4/127.0.0.1/tcp/0")
2982 .listen_on("/ip6/::1/tcp/0")
2983 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
2985 .with_connection_timeout(Duration::from_secs(15))
2986 .with_max_connections(200);
2987
2988 let config = builder.config;
2990 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2991 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
2994 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2995 assert_eq!(config.max_connections, 200);
2996
2997 Ok(())
2998 }
2999
3000 #[tokio::test]
3001 async fn test_bootstrap_peers() -> Result<()> {
3002 let mut config = create_test_node_config();
3003 config.bootstrap_peers = vec![
3004 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3005 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3006 ];
3007
3008 let node = P2PNode::new(config).await?;
3009
3010 node.start().await?;
3012
3013 let _peer_count = node.peer_count().await;
3017
3018 node.stop().await?;
3019 Ok(())
3020 }
3021
3022 #[tokio::test]
3023 async fn test_production_mode_disabled() -> Result<()> {
3024 let config = create_test_node_config();
3025 let node = P2PNode::new(config).await?;
3026
3027 assert!(!node.is_production_mode());
3028 assert!(node.production_config().is_none());
3029
3030 let result = node.resource_metrics().await;
3032 assert!(result.is_err());
3033 assert!(result.unwrap_err().to_string().contains("not enabled"));
3034
3035 Ok(())
3036 }
3037
3038 #[tokio::test]
3039 async fn test_network_event_variants() {
3040 let peer_id = "test_peer".to_string();
3042 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3043
3044 let _peer_connected = NetworkEvent::PeerConnected {
3045 peer_id: peer_id.clone(),
3046 addresses: vec![address.clone()],
3047 };
3048
3049 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3050 peer_id: peer_id.clone(),
3051 reason: "test disconnect".to_string(),
3052 };
3053
3054 let _message_received = NetworkEvent::MessageReceived {
3055 peer_id: peer_id.clone(),
3056 protocol: "test-protocol".to_string(),
3057 data: vec![1, 2, 3],
3058 };
3059
3060 let _connection_failed = NetworkEvent::ConnectionFailed {
3061 peer_id: Some(peer_id.clone()),
3062 address: address.clone(),
3063 error: "connection refused".to_string(),
3064 };
3065
3066 let _dht_stored = NetworkEvent::DHTRecordStored {
3067 key: vec![1, 2, 3],
3068 value: vec![4, 5, 6],
3069 };
3070
3071 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3072 key: vec![1, 2, 3],
3073 value: Some(vec![4, 5, 6]),
3074 };
3075 }
3076
3077 #[tokio::test]
3078 async fn test_peer_info_structure() {
3079 let peer_info = PeerInfo {
3080 peer_id: "test_peer".to_string(),
3081 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3082 connected_at: Instant::now(),
3083 last_seen: Instant::now(),
3084 status: ConnectionStatus::Connected,
3085 protocols: vec!["test-protocol".to_string()],
3086 heartbeat_count: 0,
3087 };
3088
3089 assert_eq!(peer_info.peer_id, "test_peer");
3090 assert_eq!(peer_info.addresses.len(), 1);
3091 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3092 assert_eq!(peer_info.protocols.len(), 1);
3093 }
3094
3095 #[tokio::test]
3096 async fn test_serialization() -> Result<()> {
3097 let config = create_test_node_config();
3099 let serialized = serde_json::to_string(&config)?;
3100 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3101
3102 assert_eq!(config.peer_id, deserialized.peer_id);
3103 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3104 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3105
3106 Ok(())
3107 }
3108
3109 #[tokio::test]
3110 async fn test_get_peer_id_by_address_found() -> Result<()> {
3111 let config = create_test_node_config();
3112 let node = P2PNode::new(config).await?;
3113
3114 let test_peer_id = "peer_test_123".to_string();
3116 let test_address = "192.168.1.100:9000".to_string();
3117
3118 let peer_info = PeerInfo {
3119 peer_id: test_peer_id.clone(),
3120 addresses: vec![test_address.clone()],
3121 connected_at: Instant::now(),
3122 last_seen: Instant::now(),
3123 status: ConnectionStatus::Connected,
3124 protocols: vec!["test-protocol".to_string()],
3125 heartbeat_count: 0,
3126 };
3127
3128 node.peers
3129 .write()
3130 .await
3131 .insert(test_peer_id.clone(), peer_info);
3132
3133 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3135 assert_eq!(found_peer_id, Some(test_peer_id));
3136
3137 Ok(())
3138 }
3139
3140 #[tokio::test]
3141 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3142 let config = create_test_node_config();
3143 let node = P2PNode::new(config).await?;
3144
3145 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3147 assert_eq!(result, None);
3148
3149 Ok(())
3150 }
3151
3152 #[tokio::test]
3153 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3154 let config = create_test_node_config();
3155 let node = P2PNode::new(config).await?;
3156
3157 let result = node.get_peer_id_by_address("invalid-address").await;
3159 assert_eq!(result, None);
3160
3161 Ok(())
3162 }
3163
3164 #[tokio::test]
3165 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3166 let config = create_test_node_config();
3167 let node = P2PNode::new(config).await?;
3168
3169 let peer1_id = "peer_1".to_string();
3171 let peer1_addr = "192.168.1.101:9001".to_string();
3172
3173 let peer2_id = "peer_2".to_string();
3174 let peer2_addr = "192.168.1.102:9002".to_string();
3175
3176 let peer1_info = PeerInfo {
3177 peer_id: peer1_id.clone(),
3178 addresses: vec![peer1_addr.clone()],
3179 connected_at: Instant::now(),
3180 last_seen: Instant::now(),
3181 status: ConnectionStatus::Connected,
3182 protocols: vec!["test-protocol".to_string()],
3183 heartbeat_count: 0,
3184 };
3185
3186 let peer2_info = PeerInfo {
3187 peer_id: peer2_id.clone(),
3188 addresses: vec![peer2_addr.clone()],
3189 connected_at: Instant::now(),
3190 last_seen: Instant::now(),
3191 status: ConnectionStatus::Connected,
3192 protocols: vec!["test-protocol".to_string()],
3193 heartbeat_count: 0,
3194 };
3195
3196 node.peers
3197 .write()
3198 .await
3199 .insert(peer1_id.clone(), peer1_info);
3200 node.peers
3201 .write()
3202 .await
3203 .insert(peer2_id.clone(), peer2_info);
3204
3205 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3207 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3208
3209 assert_eq!(found_peer1, Some(peer1_id));
3210 assert_eq!(found_peer2, Some(peer2_id));
3211
3212 Ok(())
3213 }
3214
3215 #[tokio::test]
3216 async fn test_list_active_connections_empty() -> Result<()> {
3217 let config = create_test_node_config();
3218 let node = P2PNode::new(config).await?;
3219
3220 let connections = node.list_active_connections().await;
3222 assert!(connections.is_empty());
3223
3224 Ok(())
3225 }
3226
3227 #[tokio::test]
3228 async fn test_list_active_connections_with_peers() -> Result<()> {
3229 let config = create_test_node_config();
3230 let node = P2PNode::new(config).await?;
3231
3232 let peer1_id = "peer_1".to_string();
3234 let peer1_addrs = vec![
3235 "192.168.1.101:9001".to_string(),
3236 "192.168.1.101:9002".to_string(),
3237 ];
3238
3239 let peer2_id = "peer_2".to_string();
3240 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3241
3242 let peer1_info = PeerInfo {
3243 peer_id: peer1_id.clone(),
3244 addresses: peer1_addrs.clone(),
3245 connected_at: Instant::now(),
3246 last_seen: Instant::now(),
3247 status: ConnectionStatus::Connected,
3248 protocols: vec!["test-protocol".to_string()],
3249 heartbeat_count: 0,
3250 };
3251
3252 let peer2_info = PeerInfo {
3253 peer_id: peer2_id.clone(),
3254 addresses: peer2_addrs.clone(),
3255 connected_at: Instant::now(),
3256 last_seen: Instant::now(),
3257 status: ConnectionStatus::Connected,
3258 protocols: vec!["test-protocol".to_string()],
3259 heartbeat_count: 0,
3260 };
3261
3262 node.peers
3263 .write()
3264 .await
3265 .insert(peer1_id.clone(), peer1_info);
3266 node.peers
3267 .write()
3268 .await
3269 .insert(peer2_id.clone(), peer2_info);
3270
3271 let connections = node.list_active_connections().await;
3273 assert_eq!(connections.len(), 2);
3274
3275 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3277 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3278
3279 assert!(peer1_conn.is_some());
3280 assert!(peer2_conn.is_some());
3281
3282 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3284 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3285
3286 Ok(())
3287 }
3288
3289 #[tokio::test]
3290 async fn test_remove_peer_success() -> Result<()> {
3291 let config = create_test_node_config();
3292 let node = P2PNode::new(config).await?;
3293
3294 let peer_id = "peer_to_remove".to_string();
3296 let peer_info = PeerInfo {
3297 peer_id: peer_id.clone(),
3298 addresses: vec!["192.168.1.100:9000".to_string()],
3299 connected_at: Instant::now(),
3300 last_seen: Instant::now(),
3301 status: ConnectionStatus::Connected,
3302 protocols: vec!["test-protocol".to_string()],
3303 heartbeat_count: 0,
3304 };
3305
3306 node.peers.write().await.insert(peer_id.clone(), peer_info);
3307
3308 assert!(node.is_peer_connected(&peer_id).await);
3310
3311 let removed = node.remove_peer(&peer_id).await;
3313 assert!(removed);
3314
3315 assert!(!node.is_peer_connected(&peer_id).await);
3317
3318 Ok(())
3319 }
3320
3321 #[tokio::test]
3322 async fn test_remove_peer_nonexistent() -> Result<()> {
3323 let config = create_test_node_config();
3324 let node = P2PNode::new(config).await?;
3325
3326 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3328 assert!(!removed);
3329
3330 Ok(())
3331 }
3332
3333 #[tokio::test]
3334 async fn test_is_peer_connected() -> Result<()> {
3335 let config = create_test_node_config();
3336 let node = P2PNode::new(config).await?;
3337
3338 let peer_id = "test_peer".to_string();
3339
3340 assert!(!node.is_peer_connected(&peer_id).await);
3342
3343 let peer_info = PeerInfo {
3345 peer_id: peer_id.clone(),
3346 addresses: vec!["192.168.1.100:9000".to_string()],
3347 connected_at: Instant::now(),
3348 last_seen: Instant::now(),
3349 status: ConnectionStatus::Connected,
3350 protocols: vec!["test-protocol".to_string()],
3351 heartbeat_count: 0,
3352 };
3353
3354 node.peers.write().await.insert(peer_id.clone(), peer_info);
3355
3356 assert!(node.is_peer_connected(&peer_id).await);
3358
3359 node.remove_peer(&peer_id).await;
3361
3362 assert!(!node.is_peer_connected(&peer_id).await);
3364
3365 Ok(())
3366 }
3367
3368 #[test]
3369 fn test_normalize_ipv6_wildcard() {
3370 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3371
3372 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3373 let normalized = normalize_wildcard_to_loopback(wildcard);
3374
3375 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3376 assert_eq!(normalized.port(), 8080);
3377 }
3378
3379 #[test]
3380 fn test_normalize_ipv4_wildcard() {
3381 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3382
3383 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3384 let normalized = normalize_wildcard_to_loopback(wildcard);
3385
3386 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3387 assert_eq!(normalized.port(), 9000);
3388 }
3389
3390 #[test]
3391 fn test_normalize_specific_address_unchanged() {
3392 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3393 let normalized = normalize_wildcard_to_loopback(specific);
3394
3395 assert_eq!(normalized, specific);
3396 }
3397
3398 #[test]
3399 fn test_normalize_loopback_unchanged() {
3400 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3401
3402 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3403 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3404 assert_eq!(normalized_v6, loopback_v6);
3405
3406 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3407 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3408 assert_eq!(normalized_v4, loopback_v4);
3409 }
3410}