1use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::json;
37use std::collections::{HashMap, HashSet};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::time::Duration;
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::Instant;
43use tracing::{debug, info, trace, warn};
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct NodeConfig {
48 pub peer_id: Option<PeerId>,
50
51 pub listen_addrs: Vec<std::net::SocketAddr>,
53
54 pub listen_addr: std::net::SocketAddr,
56
57 pub bootstrap_peers: Vec<std::net::SocketAddr>,
59
60 pub bootstrap_peers_str: Vec<String>,
62
63 pub enable_ipv6: bool,
65
66 pub connection_timeout: Duration,
69
70 pub keep_alive_interval: Duration,
72
73 pub max_connections: usize,
75
76 pub max_incoming_connections: usize,
78
79 pub dht_config: DHTConfig,
81
82 pub security_config: SecurityConfig,
84
85 pub production_config: Option<ProductionConfig>,
87
88 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct DHTConfig {
95 pub k_value: usize,
97
98 pub alpha_value: usize,
100
101 pub record_ttl: Duration,
103
104 pub refresh_interval: Duration,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SecurityConfig {
111 pub enable_noise: bool,
113
114 pub enable_tls: bool,
116
117 pub trust_level: TrustLevel,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
123pub enum TrustLevel {
124 None,
126 Basic,
128 Full,
130}
131
132impl NodeConfig {
133 pub fn new() -> Result<Self> {
139 let config = Config::default();
141
142 let listen_addr = config.listen_socket_addr()?;
144
145 let mut listen_addrs = vec![];
147
148 if config.network.ipv6_enabled {
150 let ipv6_addr = std::net::SocketAddr::new(
151 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
152 listen_addr.port(),
153 );
154 listen_addrs.push(ipv6_addr);
155 }
156
157 let ipv4_addr = std::net::SocketAddr::new(
159 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
160 listen_addr.port(),
161 );
162 listen_addrs.push(ipv4_addr);
163
164 Ok(Self {
165 peer_id: None,
166 listen_addrs,
167 listen_addr,
168 bootstrap_peers: Vec::new(),
169 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
170 enable_ipv6: config.network.ipv6_enabled,
171
172 connection_timeout: Duration::from_secs(config.network.connection_timeout),
173 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
174 max_connections: config.network.max_connections,
175 max_incoming_connections: config.security.connection_limit as usize,
176 dht_config: DHTConfig::default(),
177 security_config: SecurityConfig::default(),
178 production_config: None,
179 bootstrap_cache_config: None,
180 })
182 }
183}
184
185impl Default for NodeConfig {
186 fn default() -> Self {
187 let config = Config::default();
189
190 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
192 std::net::SocketAddr::new(
193 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
194 9000,
195 )
196 });
197
198 Self {
199 peer_id: None,
200 listen_addrs: vec![
201 std::net::SocketAddr::new(
202 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
203 listen_addr.port(),
204 ),
205 std::net::SocketAddr::new(
206 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
207 listen_addr.port(),
208 ),
209 ],
210 listen_addr,
211 bootstrap_peers: Vec::new(),
212 bootstrap_peers_str: Vec::new(),
213 enable_ipv6: config.network.ipv6_enabled,
214
215 connection_timeout: Duration::from_secs(config.network.connection_timeout),
216 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
217 max_connections: config.network.max_connections,
218 max_incoming_connections: config.security.connection_limit as usize,
219 dht_config: DHTConfig::default(),
220 security_config: SecurityConfig::default(),
221 production_config: None, bootstrap_cache_config: None,
223 }
225 }
226}
227
228impl NodeConfig {
229 pub fn from_config(config: &Config) -> Result<Self> {
231 let listen_addr = config.listen_socket_addr()?;
232 let bootstrap_addrs = config.bootstrap_addrs()?;
233
234 let mut node_config = Self {
235 peer_id: None,
236 listen_addrs: vec![listen_addr],
237 listen_addr,
238 bootstrap_peers: bootstrap_addrs
239 .iter()
240 .map(|addr| addr.socket_addr())
241 .collect(),
242 bootstrap_peers_str: config
243 .network
244 .bootstrap_nodes
245 .iter()
246 .map(|addr| addr.to_string())
247 .collect(),
248 enable_ipv6: config.network.ipv6_enabled,
249
250 connection_timeout: Duration::from_secs(config.network.connection_timeout),
251 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
252 max_connections: config.network.max_connections,
253 max_incoming_connections: config.security.connection_limit as usize,
254 dht_config: DHTConfig {
255 k_value: 20,
256 alpha_value: 3,
257 record_ttl: Duration::from_secs(3600),
258 refresh_interval: Duration::from_secs(900),
259 },
260 security_config: SecurityConfig {
261 enable_noise: true,
262 enable_tls: true,
263 trust_level: TrustLevel::Basic,
264 },
265 production_config: Some(ProductionConfig {
266 max_connections: config.network.max_connections,
267 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
270 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
271 health_check_interval: Duration::from_secs(30),
272 metrics_interval: Duration::from_secs(60),
273 enable_performance_tracking: true,
274 enable_auto_cleanup: true,
275 shutdown_timeout: Duration::from_secs(30),
276 rate_limits: crate::production::RateLimitConfig::default(),
277 }),
278 bootstrap_cache_config: None,
279 };
284
285 if config.network.ipv6_enabled {
287 node_config.listen_addrs.push(std::net::SocketAddr::new(
288 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
289 listen_addr.port(),
290 ));
291 }
292
293 Ok(node_config)
294 }
295
296 pub fn with_listen_addr(addr: &str) -> Result<Self> {
298 let listen_addr: std::net::SocketAddr = addr
299 .parse()
300 .map_err(|e: std::net::AddrParseError| {
301 NetworkError::InvalidAddress(e.to_string().into())
302 })
303 .map_err(P2PError::Network)?;
304 let cfg = NodeConfig {
305 listen_addr,
306 listen_addrs: vec![listen_addr],
307 ..Default::default()
308 };
309 Ok(cfg)
310 }
311}
312
313impl Default for DHTConfig {
314 fn default() -> Self {
315 Self {
316 k_value: 20,
317 alpha_value: 5,
318 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
321 }
322}
323
324impl Default for SecurityConfig {
325 fn default() -> Self {
326 Self {
327 enable_noise: true,
328 enable_tls: true,
329 trust_level: TrustLevel::Basic,
330 }
331 }
332}
333
334#[derive(Debug, Clone)]
336pub struct PeerInfo {
337 pub peer_id: PeerId,
339
340 pub addresses: Vec<String>,
342
343 pub connected_at: Instant,
345
346 pub last_seen: Instant,
348
349 pub status: ConnectionStatus,
351
352 pub protocols: Vec<String>,
354
355 pub heartbeat_count: u64,
357}
358
359#[derive(Debug, Clone, PartialEq)]
361pub enum ConnectionStatus {
362 Connecting,
364 Connected,
366 Disconnecting,
368 Disconnected,
370 Failed(String),
372}
373
374#[derive(Debug, Clone)]
376pub enum NetworkEvent {
377 PeerConnected {
379 peer_id: PeerId,
381 addresses: Vec<String>,
383 },
384
385 PeerDisconnected {
387 peer_id: PeerId,
389 reason: String,
391 },
392
393 MessageReceived {
395 peer_id: PeerId,
397 protocol: String,
399 data: Vec<u8>,
401 },
402
403 ConnectionFailed {
405 peer_id: Option<PeerId>,
407 address: String,
409 error: String,
411 },
412
413 DHTRecordStored {
415 key: Vec<u8>,
417 value: Vec<u8>,
419 },
420
421 DHTRecordRetrieved {
423 key: Vec<u8>,
425 value: Option<Vec<u8>>,
427 },
428}
429
430#[derive(Debug, Clone)]
435pub enum P2PEvent {
436 Message {
438 topic: String,
440 source: PeerId,
442 data: Vec<u8>,
444 },
445 PeerConnected(PeerId),
447 PeerDisconnected(PeerId),
449}
450
451pub struct P2PNode {
461 config: NodeConfig,
463
464 peer_id: PeerId,
466
467 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
469
470 event_tx: broadcast::Sender<P2PEvent>,
472
473 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
475
476 start_time: Instant,
478
479 running: RwLock<bool>,
481
482 dht: Option<Arc<RwLock<DHT>>>,
484
485 resource_manager: Option<Arc<ResourceManager>>,
487
488 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
490
491 dual_node: Arc<DualStackNetworkNode>,
493
494 #[allow(dead_code)]
496 rate_limiter: Arc<RateLimiter>,
497
498 active_connections: Arc<RwLock<HashSet<PeerId>>>,
501
502 #[allow(dead_code)]
504 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
505
506 #[allow(dead_code)]
508 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
509
510 #[allow(dead_code)]
512 shutdown: Arc<AtomicBool>,
513
514 #[allow(dead_code)]
516 geo_provider: Arc<BgpGeoProvider>,
517}
518
519fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
535 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
536
537 if addr.ip().is_unspecified() {
538 let loopback_ip = match addr {
540 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
543 std::net::SocketAddr::new(loopback_ip, addr.port())
544 } else {
545 addr
547 }
548}
549
550impl P2PNode {
551 pub fn new_for_tests() -> Result<Self> {
553 let (event_tx, _) = broadcast::channel(16);
554 Ok(Self {
555 config: NodeConfig::default(),
556 peer_id: "test_peer".to_string(),
557 peers: Arc::new(RwLock::new(HashMap::new())),
558 event_tx,
559 listen_addrs: RwLock::new(Vec::new()),
560 start_time: Instant::now(),
561 running: RwLock::new(false),
562 dht: None,
563 resource_manager: None,
564 bootstrap_manager: None,
565 dual_node: {
566 let v6: Option<std::net::SocketAddr> = "[::1]:0"
568 .parse()
569 .ok()
570 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
571 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
572 let handle = tokio::runtime::Handle::current();
573 let dual_attempt = handle.block_on(
574 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
575 );
576 let dual = match dual_attempt {
577 Ok(d) => d,
578 Err(_e1) => {
579 let fallback = handle.block_on(
581 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
582 None,
583 "127.0.0.1:0".parse().ok(),
584 ),
585 );
586 match fallback {
587 Ok(d) => d,
588 Err(e2) => {
589 return Err(P2PError::Network(NetworkError::BindError(
590 format!("Failed to create dual-stack network node: {}", e2)
591 .into(),
592 )));
593 }
594 }
595 }
596 };
597 Arc::new(dual)
598 },
599 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
600 max_requests: 100,
601 burst_size: 100,
602 window: std::time::Duration::from_secs(1),
603 ..Default::default()
604 })),
605 active_connections: Arc::new(RwLock::new(HashSet::new())),
606 connection_monitor_handle: Arc::new(RwLock::new(None)),
607 keepalive_handle: Arc::new(RwLock::new(None)),
608 shutdown: Arc::new(AtomicBool::new(false)),
609 geo_provider: Arc::new(BgpGeoProvider::new()),
610 })
611 }
612 pub async fn new(config: NodeConfig) -> Result<Self> {
614 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
615 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
617 });
618
619 let (event_tx, _) = broadcast::channel(1000);
620
621 {
624 use blake3::Hasher;
625 let mut hasher = Hasher::new();
626 hasher.update(peer_id.as_bytes());
627 let digest = hasher.finalize();
628 let mut nid = [0u8; 32];
629 nid.copy_from_slice(digest.as_bytes());
630 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
631 crate::identity::node_identity::NodeId::from_bytes(nid),
632 ));
633 }
636
637 let dht = if true {
639 let _dht_config = crate::dht::DHTConfig {
641 replication_factor: config.dht_config.k_value,
642 bucket_size: config.dht_config.k_value,
643 alpha: config.dht_config.alpha_value,
644 record_ttl: config.dht_config.record_ttl,
645 bucket_refresh_interval: config.dht_config.refresh_interval,
646 republish_interval: config.dht_config.refresh_interval,
647 max_distance: 160, };
649 let peer_bytes = peer_id.as_bytes();
651 let mut node_id_bytes = [0u8; 32];
652 let len = peer_bytes.len().min(32);
653 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
654 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
655 let dht_instance = DHT::new(node_id).map_err(|e| {
656 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
657 e.to_string().into(),
658 ))
659 })?;
660 Some(Arc::new(RwLock::new(dht_instance)))
661 } else {
662 None
663 };
664
665 let resource_manager = config
669 .production_config
670 .clone()
671 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
672
673 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
675 match BootstrapManager::with_config(cache_config.clone()).await {
676 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
677 Err(e) => {
678 warn!(
679 "Failed to initialize bootstrap manager: {}, continuing without cache",
680 e
681 );
682 None
683 }
684 }
685 } else {
686 match BootstrapManager::new().await {
687 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
688 Err(e) => {
689 warn!(
690 "Failed to initialize bootstrap manager: {}, continuing without cache",
691 e
692 );
693 None
694 }
695 }
696 };
697
698 let (v6_opt, v4_opt) = {
701 let port = config.listen_addr.port();
702 let ip = config.listen_addr.ip();
703
704 let v4_addr = if ip.is_ipv4() {
705 Some(std::net::SocketAddr::new(ip, port))
706 } else {
707 Some(std::net::SocketAddr::new(
710 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
711 port,
712 ))
713 };
714
715 let v6_addr = if config.enable_ipv6 {
716 if ip.is_ipv6() {
717 Some(std::net::SocketAddr::new(ip, port))
718 } else {
719 Some(std::net::SocketAddr::new(
720 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
721 port,
722 ))
723 }
724 } else {
725 None
726 };
727 (v6_addr, v4_addr)
728 };
729
730 let dual_node = Arc::new(
731 DualStackNetworkNode::new(v6_opt, v4_opt)
732 .await
733 .map_err(|e| {
734 P2PError::Transport(crate::error::TransportError::SetupFailed(
735 format!("Failed to create dual-stack network nodes: {}", e).into(),
736 ))
737 })?,
738 );
739
740 let rate_limiter = Arc::new(RateLimiter::new(
742 crate::validation::RateLimitConfig::default(),
743 ));
744
745 let active_connections = Arc::new(RwLock::new(HashSet::new()));
747
748 let geo_provider = Arc::new(BgpGeoProvider::new());
750
751 let peers = Arc::new(RwLock::new(HashMap::new()));
753
754 let connection_monitor_handle = {
756 let active_conns = Arc::clone(&active_connections);
757 let peers_map = Arc::clone(&peers);
758 let event_tx_clone = event_tx.clone();
759 let dual_node_clone = Arc::clone(&dual_node);
760 let geo_provider_clone = Arc::clone(&geo_provider);
761 let peer_id_clone = peer_id.clone();
762
763 let handle = tokio::spawn(async move {
764 Self::connection_lifecycle_monitor(
765 dual_node_clone,
766 active_conns,
767 peers_map,
768 event_tx_clone,
769 geo_provider_clone,
770 peer_id_clone,
771 )
772 .await;
773 });
774
775 Arc::new(RwLock::new(Some(handle)))
776 };
777
778 let shutdown = Arc::new(AtomicBool::new(false));
780 let keepalive_handle = {
781 let active_conns = Arc::clone(&active_connections);
782 let dual_node_clone = Arc::clone(&dual_node);
783 let shutdown_clone = Arc::clone(&shutdown);
784
785 let handle = tokio::spawn(async move {
786 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
787 });
788
789 Arc::new(RwLock::new(Some(handle)))
790 };
791
792 let node = Self {
793 config,
794 peer_id,
795 peers,
796 event_tx,
797 listen_addrs: RwLock::new(Vec::new()),
798 start_time: Instant::now(),
799 running: RwLock::new(false),
800 dht,
801 resource_manager,
802 bootstrap_manager,
803 dual_node,
804 rate_limiter,
805 active_connections,
806 connection_monitor_handle,
807 keepalive_handle,
808 shutdown,
809 geo_provider,
810 };
811 info!("Created P2P node with peer ID: {}", node.peer_id);
812
813 node.start_network_listeners().await?;
815
816 node.start_connection_monitor().await;
818
819 Ok(node)
820 }
821
822 pub fn builder() -> NodeBuilder {
824 NodeBuilder::new()
825 }
826
827 pub fn peer_id(&self) -> &PeerId {
829 &self.peer_id
830 }
831
832 pub fn local_addr(&self) -> Option<String> {
833 self.listen_addrs
834 .try_read()
835 .ok()
836 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
837 }
838
839 pub async fn subscribe(&self, topic: &str) -> Result<()> {
840 info!("Subscribed to topic: {}", topic);
843 Ok(())
844 }
845
846 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
847 info!(
848 "Publishing message to topic: {} ({} bytes)",
849 topic,
850 data.len()
851 );
852
853 let peer_list: Vec<PeerId> = {
855 let peers_guard = self.peers.read().await;
856 peers_guard.keys().cloned().collect()
857 };
858
859 if peer_list.is_empty() {
860 debug!("No peers connected, message will only be sent to local subscribers");
861 } else {
862 let mut send_count = 0;
864 for peer_id in &peer_list {
865 match self.send_message(peer_id, topic, data.to_vec()).await {
866 Ok(_) => {
867 send_count += 1;
868 debug!("Sent message to peer: {}", peer_id);
869 }
870 Err(e) => {
871 warn!("Failed to send message to peer {}: {}", peer_id, e);
872 }
873 }
874 }
875 info!(
876 "Published message to {}/{} connected peers",
877 send_count,
878 peer_list.len()
879 );
880 }
881
882 let event = P2PEvent::Message {
884 topic: topic.to_string(),
885 source: self.peer_id.clone(),
886 data: data.to_vec(),
887 };
888 let _ = self.event_tx.send(event);
889
890 Ok(())
891 }
892
893 pub fn config(&self) -> &NodeConfig {
895 &self.config
896 }
897
898 pub async fn start(&self) -> Result<()> {
900 info!("Starting P2P node...");
901
902 if let Some(ref resource_manager) = self.resource_manager {
904 resource_manager.start().await.map_err(|e| {
905 P2PError::Network(crate::error::NetworkError::ProtocolError(
906 format!("Failed to start resource manager: {e}").into(),
907 ))
908 })?;
909 info!("Production resource manager started");
910 }
911
912 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
914 let mut manager = bootstrap_manager.write().await;
915 manager.start_background_tasks().await.map_err(|e| {
916 P2PError::Network(crate::error::NetworkError::ProtocolError(
917 format!("Failed to start bootstrap manager: {e}").into(),
918 ))
919 })?;
920 info!("Bootstrap cache manager started");
921 }
922
923 *self.running.write().await = true;
925
926 self.start_network_listeners().await?;
928
929 let listen_addrs = self.listen_addrs.read().await;
931 info!("P2P node started on addresses: {:?}", *listen_addrs);
932
933 self.start_message_receiving_system().await?;
937
938 self.connect_bootstrap_peers().await?;
940
941 Ok(())
942 }
943
944 async fn start_network_listeners(&self) -> Result<()> {
946 info!("Starting dual-stack listeners (ant-quic)...");
947 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
949 P2PError::Transport(crate::error::TransportError::SetupFailed(
950 format!("Failed to get local addresses: {}", e).into(),
951 ))
952 })?;
953 {
954 let mut la = self.listen_addrs.write().await;
955 *la = addrs.clone();
956 }
957
958 let event_tx = self.event_tx.clone();
960 let peers = self.peers.clone();
961 let active_connections = self.active_connections.clone();
962 let rate_limiter = self.rate_limiter.clone();
963 let dual = self.dual_node.clone();
964 tokio::spawn(async move {
965 loop {
966 match dual.accept_any().await {
967 Ok((ant_peer_id, remote_sock)) => {
968 let peer_id =
969 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
970 let remote_addr = NetworkAddress::from(remote_sock);
971 let _ = rate_limiter.check_ip(&remote_sock.ip());
973 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
974 register_new_peer(&peers, &peer_id, &remote_addr).await;
975 active_connections.write().await.insert(peer_id);
976 }
977 Err(e) => {
978 warn!("Accept failed: {}", e);
979 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
980 }
981 }
982 }
983 });
984
985 info!("Dual-stack listeners active on: {:?}", addrs);
986 Ok(())
987 }
988
989 #[allow(dead_code)]
991 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
992 warn!("QUIC transport temporarily disabled during ant-quic migration");
1031 Err(crate::P2PError::Transport(
1033 crate::error::TransportError::SetupFailed(
1034 format!(
1035 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1036 )
1037 .into(),
1038 ),
1039 ))
1040 }
1041
1042 #[allow(dead_code)] async fn start_connection_acceptor(
1045 &self,
1046 transport: Arc<dyn crate::transport::Transport>,
1047 addr: std::net::SocketAddr,
1048 transport_type: crate::transport::TransportType,
1049 ) -> Result<()> {
1050 info!(
1051 "Starting connection acceptor for {:?} on {}",
1052 transport_type, addr
1053 );
1054
1055 let event_tx = self.event_tx.clone();
1057 let _peer_id = self.peer_id.clone();
1058 let peers = Arc::clone(&self.peers);
1059 let rate_limiter = Arc::clone(&self.rate_limiter);
1062
1063 tokio::spawn(async move {
1065 loop {
1066 match transport.accept().await {
1067 Ok(connection) => {
1068 let remote_addr = connection.remote_addr();
1069 let connection_peer_id =
1070 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1071
1072 let socket_addr = remote_addr.socket_addr();
1074 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1075 continue;
1077 }
1078
1079 info!(
1080 "Accepted {:?} connection from {} (peer: {})",
1081 transport_type, remote_addr, connection_peer_id
1082 );
1083
1084 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1086
1087 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1089
1090 spawn_connection_handler(
1092 connection,
1093 connection_peer_id,
1094 event_tx.clone(),
1095 Arc::clone(&peers),
1096 );
1097 }
1098 Err(e) => {
1099 warn!(
1100 "Failed to accept {:?} connection on {}: {}",
1101 transport_type, addr, e
1102 );
1103
1104 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1106 }
1107 }
1108 }
1109 });
1110
1111 info!(
1112 "Connection acceptor background task started for {:?} on {}",
1113 transport_type, addr
1114 );
1115 Ok(())
1116 }
1117
1118 async fn start_message_receiving_system(&self) -> Result<()> {
1120 info!("Starting message receiving system");
1121 let dual = self.dual_node.clone();
1122 let event_tx = self.event_tx.clone();
1123
1124 tokio::spawn(async move {
1125 loop {
1126 match dual.receive_any().await {
1127 Ok((_peer_id, bytes)) => {
1128 #[allow(clippy::collapsible_if)]
1130 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1131 if let (Some(protocol), Some(data), Some(from)) = (
1132 value.get("protocol").and_then(|v| v.as_str()),
1133 value.get("data").and_then(|v| v.as_array()),
1134 value.get("from").and_then(|v| v.as_str()),
1135 ) {
1136 let payload: Vec<u8> = data
1137 .iter()
1138 .filter_map(|v| v.as_u64().map(|n| n as u8))
1139 .collect();
1140 let _ = event_tx.send(P2PEvent::Message {
1141 topic: protocol.to_string(),
1142 source: from.to_string(),
1143 data: payload,
1144 });
1145 }
1146 }
1147 }
1148 Err(e) => {
1149 warn!("Receive error: {}", e);
1150 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1151 }
1152 }
1153 }
1154 });
1155
1156 Ok(())
1157 }
1158
1159 #[allow(dead_code)]
1161 async fn handle_received_message(
1162 &self,
1163 message_data: Vec<u8>,
1164 peer_id: &PeerId,
1165 _protocol: &str,
1166 event_tx: &broadcast::Sender<P2PEvent>,
1167 ) -> Result<()> {
1168 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1172 Ok(message) => {
1173 if let (Some(protocol), Some(data), Some(from)) = (
1174 message.get("protocol").and_then(|v| v.as_str()),
1175 message.get("data").and_then(|v| v.as_array()),
1176 message.get("from").and_then(|v| v.as_str()),
1177 ) {
1178 let data_bytes: Vec<u8> = data
1180 .iter()
1181 .filter_map(|v| v.as_u64().map(|n| n as u8))
1182 .collect();
1183
1184 let event = P2PEvent::Message {
1186 topic: protocol.to_string(),
1187 source: from.to_string(),
1188 data: data_bytes,
1189 };
1190
1191 let _ = event_tx.send(event);
1192 debug!("Generated message event from peer: {}", peer_id);
1193 }
1194 }
1195 Err(e) => {
1196 warn!("Failed to parse received message from {}: {}", peer_id, e);
1197 }
1198 }
1199
1200 Ok(())
1201 }
1202
1203 pub async fn run(&self) -> Result<()> {
1209 if !*self.running.read().await {
1210 self.start().await?;
1211 }
1212
1213 info!("P2P node running...");
1214
1215 loop {
1217 if !*self.running.read().await {
1218 break;
1219 }
1220
1221 self.periodic_tasks().await?;
1223
1224 tokio::time::sleep(Duration::from_millis(100)).await;
1226 }
1227
1228 info!("P2P node stopped");
1229 Ok(())
1230 }
1231
1232 pub async fn stop(&self) -> Result<()> {
1234 info!("Stopping P2P node...");
1235
1236 *self.running.write().await = false;
1238
1239 self.disconnect_all_peers().await?;
1241
1242 if let Some(ref resource_manager) = self.resource_manager {
1244 resource_manager.shutdown().await.map_err(|e| {
1245 P2PError::Network(crate::error::NetworkError::ProtocolError(
1246 format!("Failed to shutdown resource manager: {e}").into(),
1247 ))
1248 })?;
1249 info!("Production resource manager stopped");
1250 }
1251
1252 info!("P2P node stopped");
1253 Ok(())
1254 }
1255
1256 pub async fn shutdown(&self) -> Result<()> {
1258 self.stop().await
1259 }
1260
1261 pub async fn is_running(&self) -> bool {
1263 *self.running.read().await
1264 }
1265
1266 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1268 self.listen_addrs.read().await.clone()
1269 }
1270
1271 pub async fn connected_peers(&self) -> Vec<PeerId> {
1273 self.peers.read().await.keys().cloned().collect()
1274 }
1275
1276 pub async fn peer_count(&self) -> usize {
1278 self.peers.read().await.len()
1279 }
1280
1281 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1283 self.peers.read().await.get(peer_id).cloned()
1284 }
1285
1286 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1298 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1300
1301 let peers = self.peers.read().await;
1302
1303 for (peer_id, peer_info) in peers.iter() {
1305 for peer_addr in &peer_info.addresses {
1307 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1308 && peer_socket == socket_addr
1309 {
1310 return Some(peer_id.clone());
1311 }
1312 }
1313 }
1314
1315 None
1316 }
1317
1318 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1324 let peers = self.peers.read().await;
1325
1326 peers
1327 .iter()
1328 .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1329 .collect()
1330 }
1331
1332 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1344 self.active_connections.write().await.remove(peer_id);
1346 self.peers.write().await.remove(peer_id).is_some()
1348 }
1349
1350 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1363 self.peers.read().await.contains_key(peer_id)
1364 }
1365
1366 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1368 info!("Connecting to peer at: {}", address);
1369
1370 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1372 Some(resource_manager.acquire_connection().await?)
1373 } else {
1374 None
1375 };
1376
1377 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1379 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1380 format!("{}: {}", address, e).into(),
1381 ))
1382 })?;
1383
1384 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1387 if normalized_addr != socket_addr {
1388 info!(
1389 "Normalized wildcard address {} to loopback {}",
1390 socket_addr, normalized_addr
1391 );
1392 }
1393
1394 let addr_list = vec![normalized_addr];
1396 let peer_id = match tokio::time::timeout(
1397 self.config.connection_timeout,
1398 self.dual_node.connect_happy_eyeballs(&addr_list),
1399 )
1400 .await
1401 {
1402 Ok(Ok(peer)) => {
1403 let connected_peer_id =
1404 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1405 info!("Successfully connected to peer: {}", connected_peer_id);
1406 connected_peer_id
1407 }
1408 Ok(Err(e)) => {
1409 warn!("Failed to connect to peer at {}: {}", address, e);
1410 let sanitized_address = address.replace(['/', ':'], "_");
1411 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1412 warn!(
1413 "Using demo peer ID: {} (transport connection failed)",
1414 demo_peer_id
1415 );
1416 demo_peer_id
1417 }
1418 Err(_) => {
1419 warn!(
1420 "Timed out connecting to peer at {} after {:?}",
1421 address, self.config.connection_timeout
1422 );
1423 let sanitized_address = address.replace(['/', ':'], "_");
1424 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1425 demo_peer_id
1426 }
1427 };
1428
1429 let peer_info = PeerInfo {
1431 peer_id: peer_id.clone(),
1432 addresses: vec![address.to_string()],
1433 connected_at: Instant::now(),
1434 last_seen: Instant::now(),
1435 status: ConnectionStatus::Connected,
1436 protocols: vec!["p2p-foundation/1.0".to_string()],
1437 heartbeat_count: 0,
1438 };
1439
1440 self.peers.write().await.insert(peer_id.clone(), peer_info);
1442
1443 self.active_connections
1446 .write()
1447 .await
1448 .insert(peer_id.clone());
1449
1450 if let Some(ref resource_manager) = self.resource_manager {
1452 resource_manager.record_bandwidth(0, 0); }
1454
1455 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1457
1458 info!("Connected to peer: {}", peer_id);
1459 Ok(peer_id)
1460 }
1461
1462 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1464 info!("Disconnecting from peer: {}", peer_id);
1465
1466 self.active_connections.write().await.remove(peer_id);
1468
1469 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1470 peer_info.status = ConnectionStatus::Disconnected;
1471
1472 let _ = self
1474 .event_tx
1475 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1476
1477 info!("Disconnected from peer: {}", peer_id);
1478 }
1479
1480 Ok(())
1481 }
1482
1483 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1485 self.active_connections.read().await.contains(peer_id)
1486 }
1487
1488 pub async fn send_message(
1490 &self,
1491 peer_id: &PeerId,
1492 protocol: &str,
1493 data: Vec<u8>,
1494 ) -> Result<()> {
1495 debug!(
1496 "Sending message to peer {} on protocol {}",
1497 peer_id, protocol
1498 );
1499
1500 if let Some(ref resource_manager) = self.resource_manager
1502 && !resource_manager
1503 .check_rate_limit(peer_id, "message")
1504 .await?
1505 {
1506 return Err(P2PError::ResourceExhausted(
1507 format!("Rate limit exceeded for peer {}", peer_id).into(),
1508 ));
1509 }
1510
1511 if !self.peers.read().await.contains_key(peer_id) {
1513 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1514 peer_id.to_string().into(),
1515 )));
1516 }
1517
1518 if !self.is_connection_active(peer_id).await {
1521 debug!(
1522 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1523 peer_id
1524 );
1525
1526 self.remove_peer(peer_id).await;
1528
1529 return Err(P2PError::Network(
1530 crate::error::NetworkError::ConnectionClosed {
1531 peer_id: peer_id.to_string().into(),
1532 },
1533 ));
1534 }
1535
1536 if let Some(ref resource_manager) = self.resource_manager {
1540 resource_manager.record_bandwidth(data.len() as u64, 0);
1541 }
1542
1543 let _message_data = self.create_protocol_message(protocol, data)?;
1545
1546 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1548 tokio::time::timeout(self.config.connection_timeout, send_fut)
1549 .await
1550 .map_err(|_| {
1551 P2PError::Transport(crate::error::TransportError::StreamError(
1552 "Timed out sending message".into(),
1553 ))
1554 })?
1555 .map_err(|e| {
1556 P2PError::Transport(crate::error::TransportError::StreamError(
1557 e.to_string().into(),
1558 ))
1559 })
1560 }
1561
1562 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1564 use serde_json::json;
1565
1566 let timestamp = std::time::SystemTime::now()
1567 .duration_since(std::time::UNIX_EPOCH)
1568 .map_err(|e| {
1569 P2PError::Network(NetworkError::ProtocolError(
1570 format!("System time error: {}", e).into(),
1571 ))
1572 })?
1573 .as_secs();
1574
1575 let message = json!({
1577 "protocol": protocol,
1578 "data": data,
1579 "from": self.peer_id,
1580 "timestamp": timestamp
1581 });
1582
1583 serde_json::to_vec(&message).map_err(|e| {
1584 P2PError::Transport(crate::error::TransportError::StreamError(
1585 format!("Failed to serialize message: {e}").into(),
1586 ))
1587 })
1588 }
1589
1590 }
1592
1593#[allow(dead_code)]
1595fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1596 use serde_json::json;
1597
1598 let timestamp = std::time::SystemTime::now()
1599 .duration_since(std::time::UNIX_EPOCH)
1600 .map_err(|e| {
1601 P2PError::Network(NetworkError::ProtocolError(
1602 format!("System time error: {}", e).into(),
1603 ))
1604 })?
1605 .as_secs();
1606
1607 let message = json!({
1609 "protocol": protocol,
1610 "data": data,
1611 "timestamp": timestamp
1612 });
1613
1614 serde_json::to_vec(&message).map_err(|e| {
1615 P2PError::Transport(crate::error::TransportError::StreamError(
1616 format!("Failed to serialize message: {e}").into(),
1617 ))
1618 })
1619}
1620
1621impl P2PNode {
1622 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1624 self.event_tx.subscribe()
1625 }
1626
1627 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1629 self.subscribe_events()
1630 }
1631
1632 pub fn uptime(&self) -> Duration {
1634 self.start_time.elapsed()
1635 }
1636
1637 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1647 if let Some(ref resource_manager) = self.resource_manager {
1648 Ok(resource_manager.get_metrics().await)
1649 } else {
1650 Err(P2PError::Network(
1651 crate::error::NetworkError::ProtocolError(
1652 "Production resource manager not enabled".to_string().into(),
1653 ),
1654 ))
1655 }
1656 }
1657
1658 async fn connection_lifecycle_monitor(
1661 dual_node: Arc<DualStackNetworkNode>,
1662 active_connections: Arc<RwLock<HashSet<String>>>,
1663 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1664 event_tx: broadcast::Sender<P2PEvent>,
1665 geo_provider: Arc<BgpGeoProvider>,
1666 local_peer_id: String,
1667 ) {
1668 use crate::transport::ant_quic_adapter::ConnectionEvent;
1669
1670 let mut event_rx = dual_node.subscribe_connection_events();
1671
1672 info!("Connection lifecycle monitor started");
1673
1674 loop {
1675 match event_rx.recv().await {
1676 Ok(event) => {
1677 match event {
1678 ConnectionEvent::Established {
1679 peer_id,
1680 remote_address,
1681 } => {
1682 let peer_id_str =
1683 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1684 debug!(
1685 "Connection established: peer={}, addr={}",
1686 peer_id_str, remote_address
1687 );
1688
1689 let ip = remote_address.ip();
1692 let is_rejected = match ip {
1693 std::net::IpAddr::V4(v4) => {
1694 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1696 geo_provider.is_hosting_asn(asn)
1697 || geo_provider.is_vpn_asn(asn)
1698 } else {
1699 false
1700 }
1701 }
1702 std::net::IpAddr::V6(v6) => {
1703 let info = geo_provider.lookup(v6);
1704 info.is_hosting_provider || info.is_vpn_provider
1705 }
1706 };
1707
1708 if is_rejected {
1709 info!(
1710 "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1711 peer_id_str, remote_address
1712 );
1713
1714 let rejection = RejectionMessage {
1716 reason: RejectionReason::GeoIpPolicy,
1717 message:
1718 "Connection rejected: Hosting/VPN providers not allowed"
1719 .to_string(),
1720 suggested_target: None, };
1722
1723 if let Ok(data) = serde_json::to_vec(&rejection) {
1725 let timestamp = std::time::SystemTime::now()
1727 .duration_since(std::time::UNIX_EPOCH)
1728 .unwrap_or_default()
1729 .as_secs();
1730
1731 let message = serde_json::json!({
1732 "protocol": "control",
1733 "data": data,
1734 "from": local_peer_id,
1735 "timestamp": timestamp
1736 });
1737
1738 if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1739 let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1743
1744 tokio::task::yield_now().await;
1747 }
1748 }
1749
1750 continue;
1754 }
1755
1756 active_connections.write().await.insert(peer_id_str.clone());
1758
1759 let mut peers_lock = peers.write().await;
1761 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1762 peer_info.status = ConnectionStatus::Connected;
1763 peer_info.connected_at = Instant::now();
1764 } else {
1765 debug!("Registering new incoming peer: {}", peer_id_str);
1767 peers_lock.insert(
1768 peer_id_str.clone(),
1769 PeerInfo {
1770 peer_id: peer_id_str.clone(),
1771 addresses: vec![remote_address.to_string()],
1772 status: ConnectionStatus::Connected,
1773 last_seen: Instant::now(),
1774 connected_at: Instant::now(),
1775 protocols: Vec::new(),
1776 heartbeat_count: 0,
1777 },
1778 );
1779 }
1780
1781 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1783 }
1784 ConnectionEvent::Lost { peer_id, reason } => {
1785 let peer_id_str =
1786 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1787 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1788
1789 active_connections.write().await.remove(&peer_id_str);
1791
1792 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1794 peer_info.status = ConnectionStatus::Disconnected;
1795 peer_info.last_seen = Instant::now();
1796 }
1797
1798 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1800 }
1801 ConnectionEvent::Failed { peer_id, reason } => {
1802 let peer_id_str =
1803 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1804 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1805
1806 active_connections.write().await.remove(&peer_id_str);
1808
1809 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1811 peer_info.status = ConnectionStatus::Failed(reason.clone());
1812 }
1813
1814 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1816 }
1817 }
1818 }
1819 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1820 warn!(
1821 "Connection event monitor lagged, skipped {} events",
1822 skipped
1823 );
1824 continue;
1825 }
1826 Err(broadcast::error::RecvError::Closed) => {
1827 info!("Connection event channel closed, stopping monitor");
1828 break;
1829 }
1830 }
1831 }
1832
1833 info!("Connection lifecycle monitor stopped");
1834 }
1835
1836 async fn start_connection_monitor(&self) {
1838 debug!("Connection monitor already running from initialization");
1842 }
1843
1844 async fn keepalive_task(
1850 active_connections: Arc<RwLock<HashSet<String>>>,
1851 dual_node: Arc<DualStackNetworkNode>,
1852 shutdown: Arc<AtomicBool>,
1853 ) {
1854 use tokio::time::{Duration, interval};
1855
1856 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1860 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1861
1862 info!(
1863 "Keepalive task started (interval: {}s)",
1864 KEEPALIVE_INTERVAL_SECS
1865 );
1866
1867 loop {
1868 if shutdown.load(Ordering::Relaxed) {
1870 info!("Keepalive task shutting down");
1871 break;
1872 }
1873
1874 interval.tick().await;
1875
1876 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1878
1879 if peers.is_empty() {
1880 trace!("Keepalive: no active connections");
1881 continue;
1882 }
1883
1884 debug!("Sending keepalive to {} active connections", peers.len());
1885
1886 for peer_id in peers {
1888 match dual_node
1889 .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1890 .await
1891 {
1892 Ok(_) => {
1893 trace!("Keepalive sent to peer: {}", peer_id);
1894 }
1895 Err(e) => {
1896 debug!(
1897 "Failed to send keepalive to peer {}: {} (connection may have closed)",
1898 peer_id, e
1899 );
1900 }
1902 }
1903 }
1904 }
1905
1906 info!("Keepalive task stopped");
1907 }
1908
1909 pub async fn health_check(&self) -> Result<()> {
1911 if let Some(ref resource_manager) = self.resource_manager {
1912 resource_manager.health_check().await
1913 } else {
1914 let peer_count = self.peer_count().await;
1916 if peer_count > self.config.max_connections {
1917 Err(P2PError::Network(
1918 crate::error::NetworkError::ProtocolError(
1919 format!("Too many connections: {peer_count}").into(),
1920 ),
1921 ))
1922 } else {
1923 Ok(())
1924 }
1925 }
1926 }
1927
1928 pub fn production_config(&self) -> Option<&ProductionConfig> {
1930 self.config.production_config.as_ref()
1931 }
1932
1933 pub fn is_production_mode(&self) -> bool {
1935 self.resource_manager.is_some()
1936 }
1937
1938 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1940 self.dht.as_ref()
1941 }
1942
1943 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1945 if let Some(ref dht) = self.dht {
1946 let mut dht_instance = dht.write().await;
1947 let dht_key = crate::dht::DhtKey::from_bytes(key);
1948 dht_instance
1949 .store(&dht_key, value.clone())
1950 .await
1951 .map_err(|e| {
1952 P2PError::Dht(crate::error::DhtError::StoreFailed(
1953 format!("{:?}: {e}", key).into(),
1954 ))
1955 })?;
1956
1957 Ok(())
1958 } else {
1959 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1960 "DHT not enabled".to_string().into(),
1961 )))
1962 }
1963 }
1964
1965 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1967 if let Some(ref dht) = self.dht {
1968 let dht_instance = dht.read().await;
1969 let dht_key = crate::dht::DhtKey::from_bytes(key);
1970 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1971 P2PError::Dht(crate::error::DhtError::StoreFailed(
1972 format!("Retrieve failed: {e}").into(),
1973 ))
1974 })?;
1975
1976 Ok(record_result)
1977 } else {
1978 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1979 "DHT not enabled".to_string().into(),
1980 )))
1981 }
1982 }
1983
1984 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1986 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1987 let mut manager = bootstrap_manager.write().await;
1988 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1989 .iter()
1990 .filter_map(|addr| addr.parse().ok())
1991 .collect();
1992 let contact = ContactEntry::new(peer_id, socket_addresses);
1993 manager.add_contact(contact).await.map_err(|e| {
1994 P2PError::Network(crate::error::NetworkError::ProtocolError(
1995 format!("Failed to add peer to bootstrap cache: {e}").into(),
1996 ))
1997 })?;
1998 }
1999 Ok(())
2000 }
2001
2002 pub async fn update_peer_metrics(
2004 &self,
2005 peer_id: &PeerId,
2006 success: bool,
2007 latency_ms: Option<u64>,
2008 _error: Option<String>,
2009 ) -> Result<()> {
2010 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2011 let mut manager = bootstrap_manager.write().await;
2012
2013 let metrics = QualityMetrics {
2015 success_rate: if success { 1.0 } else { 0.0 },
2016 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2017 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2019 last_successful_connection: if success {
2020 chrono::Utc::now()
2021 } else {
2022 chrono::Utc::now() - chrono::Duration::hours(1)
2023 },
2024 uptime_score: 0.5,
2025 };
2026
2027 manager
2028 .update_contact_metrics(peer_id, metrics)
2029 .await
2030 .map_err(|e| {
2031 P2PError::Network(crate::error::NetworkError::ProtocolError(
2032 format!("Failed to update peer metrics: {e}").into(),
2033 ))
2034 })?;
2035 }
2036 Ok(())
2037 }
2038
2039 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2041 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2042 let manager = bootstrap_manager.read().await;
2043 let stats = manager.get_stats().await.map_err(|e| {
2044 P2PError::Network(crate::error::NetworkError::ProtocolError(
2045 format!("Failed to get bootstrap stats: {e}").into(),
2046 ))
2047 })?;
2048 Ok(Some(stats))
2049 } else {
2050 Ok(None)
2051 }
2052 }
2053
2054 pub async fn cached_peer_count(&self) -> usize {
2056 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2057 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2058 {
2059 return stats.total_contacts;
2060 }
2061 0
2062 }
2063
2064 async fn connect_bootstrap_peers(&self) -> Result<()> {
2066 let mut bootstrap_contacts = Vec::new();
2067 let mut used_cache = false;
2068 let mut seen_addresses = std::collections::HashSet::new();
2069
2070 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2072 self.config.bootstrap_peers_str.clone()
2073 } else {
2074 self.config
2076 .bootstrap_peers
2077 .iter()
2078 .map(|addr| addr.to_string())
2079 .collect::<Vec<_>>()
2080 };
2081
2082 if !cli_bootstrap_peers.is_empty() {
2083 info!(
2084 "Using {} CLI-provided bootstrap peers (priority)",
2085 cli_bootstrap_peers.len()
2086 );
2087 for addr in &cli_bootstrap_peers {
2088 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2089 seen_addresses.insert(socket_addr);
2090 let contact = ContactEntry::new(
2091 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2092 vec![socket_addr],
2093 );
2094 bootstrap_contacts.push(contact);
2095 } else {
2096 warn!("Invalid bootstrap address format: {}", addr);
2097 }
2098 }
2099 }
2100
2101 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2103 let manager = bootstrap_manager.read().await;
2104 match manager.get_bootstrap_peers(20).await {
2105 Ok(contacts) => {
2107 if !contacts.is_empty() {
2108 let mut added_from_cache = 0;
2109 for contact in contacts {
2110 let new_addresses: Vec<_> = contact
2112 .addresses
2113 .iter()
2114 .filter(|addr| !seen_addresses.contains(addr))
2115 .copied()
2116 .collect();
2117
2118 if !new_addresses.is_empty() {
2119 for addr in &new_addresses {
2120 seen_addresses.insert(*addr);
2121 }
2122 let mut contact = contact.clone();
2123 contact.addresses = new_addresses;
2124 bootstrap_contacts.push(contact);
2125 added_from_cache += 1;
2126 }
2127 }
2128 if added_from_cache > 0 {
2129 info!(
2130 "Added {} cached bootstrap peers (supplementing CLI peers)",
2131 added_from_cache
2132 );
2133 used_cache = true;
2134 }
2135 }
2136 }
2137 Err(e) => {
2138 warn!("Failed to get cached bootstrap peers: {}", e);
2139 }
2140 }
2141 }
2142
2143 if bootstrap_contacts.is_empty() {
2144 info!("No bootstrap peers configured and no cached peers available");
2145 return Ok(());
2146 }
2147
2148 let mut successful_connections = 0;
2150 for contact in bootstrap_contacts {
2151 for addr in &contact.addresses {
2152 match self.connect_peer(&addr.to_string()).await {
2153 Ok(peer_id) => {
2154 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2155 successful_connections += 1;
2156
2157 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2159 let mut manager = bootstrap_manager.write().await;
2160 let mut updated_contact = contact.clone();
2161 updated_contact.peer_id = peer_id.clone();
2162 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2165 warn!("Failed to update bootstrap cache: {}", e);
2166 }
2167 }
2168 break; }
2170 Err(e) => {
2171 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2172
2173 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2175 let mut manager = bootstrap_manager.write().await;
2176 let mut updated_contact = contact.clone();
2177 updated_contact.update_connection_result(
2178 false,
2179 None,
2180 Some(e.to_string()),
2181 );
2182
2183 if let Err(e) = manager.add_contact(updated_contact).await {
2184 warn!("Failed to update bootstrap cache: {}", e);
2185 }
2186 }
2187 }
2188 }
2189 }
2190 }
2191
2192 if successful_connections == 0 {
2193 if !used_cache {
2194 warn!("Failed to connect to any bootstrap peers");
2195 }
2196 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2197 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2199 }));
2200 }
2201 info!(
2202 "Successfully connected to {} bootstrap peers",
2203 successful_connections
2204 );
2205
2206 Ok(())
2207 }
2208
2209 async fn disconnect_all_peers(&self) -> Result<()> {
2211 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2212
2213 for peer_id in peer_ids {
2214 self.disconnect_peer(&peer_id).await?;
2215 }
2216
2217 Ok(())
2218 }
2219
2220 async fn periodic_tasks(&self) -> Result<()> {
2222 Ok(())
2228 }
2229}
2230
2231#[async_trait::async_trait]
2233pub trait NetworkSender: Send + Sync {
2234 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2236
2237 fn local_peer_id(&self) -> &PeerId;
2239}
2240
2241#[derive(Clone)]
2243pub struct P2PNetworkSender {
2244 peer_id: PeerId,
2245 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2247}
2248
2249impl P2PNetworkSender {
2250 pub fn new(
2251 peer_id: PeerId,
2252 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2253 ) -> Self {
2254 Self { peer_id, send_tx }
2255 }
2256}
2257
2258#[async_trait::async_trait]
2260impl NetworkSender for P2PNetworkSender {
2261 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2263 self.send_tx
2264 .send((peer_id.clone(), protocol.to_string(), data))
2265 .map_err(|_| {
2266 P2PError::Network(crate::error::NetworkError::ProtocolError(
2267 "Failed to send message via channel".to_string().into(),
2268 ))
2269 })?;
2270 Ok(())
2271 }
2272
2273 fn local_peer_id(&self) -> &PeerId {
2275 &self.peer_id
2276 }
2277}
2278
2279pub struct NodeBuilder {
2281 config: NodeConfig,
2282}
2283
2284impl Default for NodeBuilder {
2285 fn default() -> Self {
2286 Self::new()
2287 }
2288}
2289
2290impl NodeBuilder {
2291 pub fn new() -> Self {
2293 Self {
2294 config: NodeConfig::default(),
2295 }
2296 }
2297
2298 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2300 self.config.peer_id = Some(peer_id);
2301 self
2302 }
2303
2304 pub fn listen_on(mut self, addr: &str) -> Self {
2306 if let Ok(multiaddr) = addr.parse() {
2307 self.config.listen_addrs.push(multiaddr);
2308 }
2309 self
2310 }
2311
2312 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2314 if let Ok(multiaddr) = addr.parse() {
2315 self.config.bootstrap_peers.push(multiaddr);
2316 }
2317 self.config.bootstrap_peers_str.push(addr.to_string());
2318 self
2319 }
2320
2321 pub fn with_ipv6(mut self, enable: bool) -> Self {
2323 self.config.enable_ipv6 = enable;
2324 self
2325 }
2326
2327 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2331 self.config.connection_timeout = timeout;
2332 self
2333 }
2334
2335 pub fn with_max_connections(mut self, max: usize) -> Self {
2337 self.config.max_connections = max;
2338 self
2339 }
2340
2341 pub fn with_production_mode(mut self) -> Self {
2343 self.config.production_config = Some(ProductionConfig::default());
2344 self
2345 }
2346
2347 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2349 self.config.production_config = Some(production_config);
2350 self
2351 }
2352
2353 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2355 self.config.dht_config = dht_config;
2356 self
2357 }
2358
2359 pub fn with_default_dht(mut self) -> Self {
2361 self.config.dht_config = DHTConfig::default();
2362 self
2363 }
2364
2365 pub async fn build(self) -> Result<P2PNode> {
2367 P2PNode::new(self.config).await
2368 }
2369}
2370
2371#[allow(dead_code)] async fn handle_received_message_standalone(
2374 message_data: Vec<u8>,
2375 peer_id: &PeerId,
2376 _protocol: &str,
2377 event_tx: &broadcast::Sender<P2PEvent>,
2378) -> Result<()> {
2379 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2381 Ok(message) => {
2382 if let (Some(protocol), Some(data), Some(from)) = (
2383 message.get("protocol").and_then(|v| v.as_str()),
2384 message.get("data").and_then(|v| v.as_array()),
2385 message.get("from").and_then(|v| v.as_str()),
2386 ) {
2387 let data_bytes: Vec<u8> = data
2389 .iter()
2390 .filter_map(|v| v.as_u64().map(|n| n as u8))
2391 .collect();
2392
2393 let event = P2PEvent::Message {
2395 topic: protocol.to_string(),
2396 source: from.to_string(),
2397 data: data_bytes,
2398 };
2399
2400 let _ = event_tx.send(event);
2401 debug!("Generated message event from peer: {}", peer_id);
2402 }
2403 }
2404 Err(e) => {
2405 warn!("Failed to parse received message from {}: {}", peer_id, e);
2406 }
2407 }
2408
2409 Ok(())
2410}
2411
2412#[allow(dead_code)]
2416fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2417 match create_protocol_message_static(protocol, data) {
2418 Ok(msg) => Some(msg),
2419 Err(e) => {
2420 warn!("Failed to create protocol message: {}", e);
2421 None
2422 }
2423 }
2424}
2425
2426#[allow(dead_code)]
2428async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2429 match result {
2430 Ok(_) => {
2431 debug!("Message sent to peer {} via transport layer", peer_id);
2432 }
2433 Err(e) => {
2434 warn!("Failed to send message to peer {}: {}", peer_id, e);
2435 }
2436 }
2437}
2438
2439#[allow(dead_code)] fn check_rate_limit(
2442 rate_limiter: &RateLimiter,
2443 socket_addr: &std::net::SocketAddr,
2444 remote_addr: &NetworkAddress,
2445) -> Result<()> {
2446 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2447 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2448 e
2449 })
2450}
2451
2452#[allow(dead_code)] async fn register_new_peer(
2455 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2456 peer_id: &PeerId,
2457 remote_addr: &NetworkAddress,
2458) {
2459 let mut peers_guard = peers.write().await;
2460 let peer_info = PeerInfo {
2461 peer_id: peer_id.clone(),
2462 addresses: vec![remote_addr.to_string()],
2463 connected_at: tokio::time::Instant::now(),
2464 last_seen: tokio::time::Instant::now(),
2465 status: ConnectionStatus::Connected,
2466 protocols: vec!["p2p-chat/1.0.0".to_string()],
2467 heartbeat_count: 0,
2468 };
2469 peers_guard.insert(peer_id.clone(), peer_info);
2470}
2471
2472#[allow(dead_code)] fn spawn_connection_handler(
2475 connection: Box<dyn crate::transport::Connection>,
2476 peer_id: PeerId,
2477 event_tx: broadcast::Sender<P2PEvent>,
2478 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2479) {
2480 tokio::spawn(async move {
2481 handle_peer_connection(connection, peer_id, event_tx, peers).await;
2482 });
2483}
2484
2485#[allow(dead_code)] async fn handle_peer_connection(
2488 mut connection: Box<dyn crate::transport::Connection>,
2489 peer_id: PeerId,
2490 event_tx: broadcast::Sender<P2PEvent>,
2491 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2492) {
2493 loop {
2494 match connection.receive().await {
2495 Ok(message_data) => {
2496 debug!(
2497 "Received {} bytes from peer: {}",
2498 message_data.len(),
2499 peer_id
2500 );
2501
2502 if let Err(e) = handle_received_message_standalone(
2504 message_data,
2505 &peer_id,
2506 "unknown", &event_tx,
2508 )
2509 .await
2510 {
2511 warn!("Failed to handle message from peer {}: {}", peer_id, e);
2512 }
2513 }
2514 Err(e) => {
2515 warn!("Failed to receive message from {}: {}", peer_id, e);
2516
2517 if !connection.is_alive().await {
2519 info!("Connection to {} is dead, removing peer", peer_id);
2520
2521 remove_peer(&peers, &peer_id).await;
2523
2524 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2526
2527 break; }
2529
2530 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2532 }
2533 }
2534 }
2535}
2536
2537#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2540 let mut peers_guard = peers.write().await;
2541 peers_guard.remove(peer_id);
2542}
2543
2544#[allow(dead_code)]
2546async fn update_peer_heartbeat(
2547 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2548 peer_id: &PeerId,
2549) -> Result<()> {
2550 let mut peers_guard = peers.write().await;
2551 match peers_guard.get_mut(peer_id) {
2552 Some(peer_info) => {
2553 peer_info.last_seen = Instant::now();
2554 peer_info.heartbeat_count += 1;
2555 Ok(())
2556 }
2557 None => {
2558 warn!("Received heartbeat from unknown peer: {}", peer_id);
2559 Err(P2PError::Network(NetworkError::PeerNotFound(
2560 format!("Peer {} not found", peer_id).into(),
2561 )))
2562 }
2563 }
2564}
2565
2566#[allow(dead_code)]
2568async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2569 if let Some(manager) = resource_manager {
2570 let metrics = manager.get_metrics().await;
2571 (metrics.memory_used, metrics.cpu_usage)
2572 } else {
2573 (0, 0.0)
2574 }
2575}
2576
2577#[cfg(test)]
2578mod tests {
2579 use super::*;
2580 use std::time::Duration;
2582 use tokio::time::timeout;
2583
2584 fn create_test_node_config() -> NodeConfig {
2590 NodeConfig {
2591 peer_id: Some("test_peer_123".to_string()),
2592 listen_addrs: vec![
2593 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2594 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2595 ],
2596 listen_addr: std::net::SocketAddr::new(
2597 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2598 0,
2599 ),
2600 bootstrap_peers: vec![],
2601 bootstrap_peers_str: vec![],
2602 enable_ipv6: true,
2603
2604 connection_timeout: Duration::from_millis(300),
2605 keep_alive_interval: Duration::from_secs(30),
2606 max_connections: 100,
2607 max_incoming_connections: 50,
2608 dht_config: DHTConfig::default(),
2609 security_config: SecurityConfig::default(),
2610 production_config: None,
2611 bootstrap_cache_config: None,
2612 }
2614 }
2615
2616 #[tokio::test]
2620 async fn test_node_config_default() {
2621 let config = NodeConfig::default();
2622
2623 assert!(config.peer_id.is_none());
2624 assert_eq!(config.listen_addrs.len(), 2);
2625 assert!(config.enable_ipv6);
2626 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2628 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2629 }
2630
2631 #[tokio::test]
2632 async fn test_dht_config_default() {
2633 let config = DHTConfig::default();
2634
2635 assert_eq!(config.k_value, 20);
2636 assert_eq!(config.alpha_value, 5);
2637 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2638 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2639 }
2640
2641 #[tokio::test]
2642 async fn test_security_config_default() {
2643 let config = SecurityConfig::default();
2644
2645 assert!(config.enable_noise);
2646 assert!(config.enable_tls);
2647 assert_eq!(config.trust_level, TrustLevel::Basic);
2648 }
2649
2650 #[test]
2651 fn test_trust_level_variants() {
2652 let _none = TrustLevel::None;
2654 let _basic = TrustLevel::Basic;
2655 let _full = TrustLevel::Full;
2656
2657 assert_eq!(TrustLevel::None, TrustLevel::None);
2659 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2660 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2661 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2662 }
2663
2664 #[test]
2665 fn test_connection_status_variants() {
2666 let connecting = ConnectionStatus::Connecting;
2667 let connected = ConnectionStatus::Connected;
2668 let disconnecting = ConnectionStatus::Disconnecting;
2669 let disconnected = ConnectionStatus::Disconnected;
2670 let failed = ConnectionStatus::Failed("test error".to_string());
2671
2672 assert_eq!(connecting, ConnectionStatus::Connecting);
2673 assert_eq!(connected, ConnectionStatus::Connected);
2674 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2675 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2676 assert_ne!(connecting, connected);
2677
2678 if let ConnectionStatus::Failed(msg) = failed {
2679 assert_eq!(msg, "test error");
2680 } else {
2681 panic!("Expected Failed status");
2682 }
2683 }
2684
2685 #[tokio::test]
2686 async fn test_node_creation() -> Result<()> {
2687 let config = create_test_node_config();
2688 let node = P2PNode::new(config).await?;
2689
2690 assert_eq!(node.peer_id(), "test_peer_123");
2691 assert!(!node.is_running().await);
2692 assert_eq!(node.peer_count().await, 0);
2693 assert!(node.connected_peers().await.is_empty());
2694
2695 Ok(())
2696 }
2697
2698 #[tokio::test]
2699 async fn test_node_creation_without_peer_id() -> Result<()> {
2700 let mut config = create_test_node_config();
2701 config.peer_id = None;
2702
2703 let node = P2PNode::new(config).await?;
2704
2705 assert!(node.peer_id().starts_with("peer_"));
2707 assert!(!node.is_running().await);
2708
2709 Ok(())
2710 }
2711
2712 #[tokio::test]
2713 async fn test_node_lifecycle() -> Result<()> {
2714 let config = create_test_node_config();
2715 let node = P2PNode::new(config).await?;
2716
2717 assert!(!node.is_running().await);
2719
2720 node.start().await?;
2722 assert!(node.is_running().await);
2723
2724 let listen_addrs = node.listen_addrs().await;
2726 assert!(
2727 !listen_addrs.is_empty(),
2728 "Expected at least one listening address"
2729 );
2730
2731 node.stop().await?;
2733 assert!(!node.is_running().await);
2734
2735 Ok(())
2736 }
2737
2738 #[tokio::test]
2739 async fn test_peer_connection() -> Result<()> {
2740 let config = create_test_node_config();
2741 let node = P2PNode::new(config).await?;
2742
2743 let peer_addr = "127.0.0.1:0";
2744
2745 let peer_id = node.connect_peer(peer_addr).await?;
2747 assert!(peer_id.starts_with("peer_from_"));
2748
2749 assert_eq!(node.peer_count().await, 1);
2751
2752 let connected_peers = node.connected_peers().await;
2754 assert_eq!(connected_peers.len(), 1);
2755 assert_eq!(connected_peers[0], peer_id);
2756
2757 let peer_info = node.peer_info(&peer_id).await;
2759 assert!(peer_info.is_some());
2760 let info = peer_info.expect("Peer info should exist after adding peer");
2761 assert_eq!(info.peer_id, peer_id);
2762 assert_eq!(info.status, ConnectionStatus::Connected);
2763 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2764
2765 node.disconnect_peer(&peer_id).await?;
2767 assert_eq!(node.peer_count().await, 0);
2768
2769 Ok(())
2770 }
2771
2772 #[tokio::test]
2773 async fn test_event_subscription() -> Result<()> {
2774 let config = create_test_node_config();
2775 let node = P2PNode::new(config).await?;
2776
2777 let mut events = node.subscribe_events();
2778 let peer_addr = "127.0.0.1:0";
2779
2780 let peer_id = node.connect_peer(peer_addr).await?;
2782
2783 let event = timeout(Duration::from_millis(100), events.recv()).await;
2785 assert!(event.is_ok());
2786
2787 let event_result = event
2788 .expect("Should receive event")
2789 .expect("Event should not be error");
2790 match event_result {
2791 P2PEvent::PeerConnected(event_peer_id) => {
2792 assert_eq!(event_peer_id, peer_id);
2793 }
2794 _ => panic!("Expected PeerConnected event"),
2795 }
2796
2797 node.disconnect_peer(&peer_id).await?;
2799
2800 let event = timeout(Duration::from_millis(100), events.recv()).await;
2802 assert!(event.is_ok());
2803
2804 let event_result = event
2805 .expect("Should receive event")
2806 .expect("Event should not be error");
2807 match event_result {
2808 P2PEvent::PeerDisconnected(event_peer_id) => {
2809 assert_eq!(event_peer_id, peer_id);
2810 }
2811 _ => panic!("Expected PeerDisconnected event"),
2812 }
2813
2814 Ok(())
2815 }
2816
2817 #[tokio::test]
2818 async fn test_message_sending() -> Result<()> {
2819 let mut config1 = create_test_node_config();
2821 config1.listen_addr =
2822 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2823 let node1 = P2PNode::new(config1).await?;
2824 node1.start().await?;
2825
2826 let mut config2 = create_test_node_config();
2827 config2.listen_addr =
2828 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2829 let node2 = P2PNode::new(config2).await?;
2830 node2.start().await?;
2831
2832 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2834
2835 let node2_addr = node2.local_addr().ok_or_else(|| {
2837 P2PError::Network(crate::error::NetworkError::ProtocolError(
2838 "No listening address".to_string().into(),
2839 ))
2840 })?;
2841
2842 let peer_id =
2844 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2845 Ok(res) => res?,
2846 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2847 };
2848
2849 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2851
2852 let message_data = b"Hello, peer!".to_vec();
2854 let result = match timeout(
2855 Duration::from_millis(500),
2856 node1.send_message(&peer_id, "test-protocol", message_data),
2857 )
2858 .await
2859 {
2860 Ok(res) => res,
2861 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2862 };
2863 if let Err(e) = &result {
2866 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2867 }
2868
2869 let non_existent_peer = "non_existent_peer".to_string();
2871 let result = node1
2872 .send_message(&non_existent_peer, "test-protocol", vec![])
2873 .await;
2874 assert!(result.is_err(), "Sending to non-existent peer should fail");
2875
2876 Ok(())
2877 }
2878
2879 #[tokio::test]
2880 async fn test_remote_mcp_operations() -> Result<()> {
2881 let config = create_test_node_config();
2882 let node = P2PNode::new(config).await?;
2883
2884 node.start().await?;
2886 node.stop().await?;
2887 Ok(())
2888 }
2889
2890 #[tokio::test]
2891 async fn test_health_check() -> Result<()> {
2892 let config = create_test_node_config();
2893 let node = P2PNode::new(config).await?;
2894
2895 let result = node.health_check().await;
2897 assert!(result.is_ok());
2898
2899 Ok(())
2904 }
2905
2906 #[tokio::test]
2907 async fn test_node_uptime() -> Result<()> {
2908 let config = create_test_node_config();
2909 let node = P2PNode::new(config).await?;
2910
2911 let uptime1 = node.uptime();
2912 assert!(uptime1 >= Duration::from_secs(0));
2913
2914 tokio::time::sleep(Duration::from_millis(10)).await;
2916
2917 let uptime2 = node.uptime();
2918 assert!(uptime2 > uptime1);
2919
2920 Ok(())
2921 }
2922
2923 #[tokio::test]
2924 async fn test_node_config_access() -> Result<()> {
2925 let config = create_test_node_config();
2926 let expected_peer_id = config.peer_id.clone();
2927 let node = P2PNode::new(config).await?;
2928
2929 let node_config = node.config();
2930 assert_eq!(node_config.peer_id, expected_peer_id);
2931 assert_eq!(node_config.max_connections, 100);
2932 Ok(())
2935 }
2936
2937 #[tokio::test]
2938 async fn test_mcp_server_access() -> Result<()> {
2939 let config = create_test_node_config();
2940 let _node = P2PNode::new(config).await?;
2941
2942 Ok(())
2944 }
2945
2946 #[tokio::test]
2947 async fn test_dht_access() -> Result<()> {
2948 let config = create_test_node_config();
2949 let node = P2PNode::new(config).await?;
2950
2951 assert!(node.dht().is_some());
2953
2954 Ok(())
2955 }
2956
2957 #[tokio::test]
2958 async fn test_node_builder() -> Result<()> {
2959 let builder = P2PNode::builder()
2961 .with_peer_id("builder_test_peer".to_string())
2962 .listen_on("/ip4/127.0.0.1/tcp/0")
2963 .listen_on("/ip6/::1/tcp/0")
2964 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
2966 .with_connection_timeout(Duration::from_secs(15))
2967 .with_max_connections(200);
2968
2969 let config = builder.config;
2971 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2972 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
2975 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2976 assert_eq!(config.max_connections, 200);
2977
2978 Ok(())
2979 }
2980
2981 #[tokio::test]
2982 async fn test_bootstrap_peers() -> Result<()> {
2983 let mut config = create_test_node_config();
2984 config.bootstrap_peers = vec![
2985 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2986 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2987 ];
2988
2989 let node = P2PNode::new(config).await?;
2990
2991 node.start().await?;
2993
2994 let _peer_count = node.peer_count().await;
2998
2999 node.stop().await?;
3000 Ok(())
3001 }
3002
3003 #[tokio::test]
3004 async fn test_production_mode_disabled() -> Result<()> {
3005 let config = create_test_node_config();
3006 let node = P2PNode::new(config).await?;
3007
3008 assert!(!node.is_production_mode());
3009 assert!(node.production_config().is_none());
3010
3011 let result = node.resource_metrics().await;
3013 assert!(result.is_err());
3014 assert!(result.unwrap_err().to_string().contains("not enabled"));
3015
3016 Ok(())
3017 }
3018
3019 #[tokio::test]
3020 async fn test_network_event_variants() {
3021 let peer_id = "test_peer".to_string();
3023 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3024
3025 let _peer_connected = NetworkEvent::PeerConnected {
3026 peer_id: peer_id.clone(),
3027 addresses: vec![address.clone()],
3028 };
3029
3030 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3031 peer_id: peer_id.clone(),
3032 reason: "test disconnect".to_string(),
3033 };
3034
3035 let _message_received = NetworkEvent::MessageReceived {
3036 peer_id: peer_id.clone(),
3037 protocol: "test-protocol".to_string(),
3038 data: vec![1, 2, 3],
3039 };
3040
3041 let _connection_failed = NetworkEvent::ConnectionFailed {
3042 peer_id: Some(peer_id.clone()),
3043 address: address.clone(),
3044 error: "connection refused".to_string(),
3045 };
3046
3047 let _dht_stored = NetworkEvent::DHTRecordStored {
3048 key: vec![1, 2, 3],
3049 value: vec![4, 5, 6],
3050 };
3051
3052 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3053 key: vec![1, 2, 3],
3054 value: Some(vec![4, 5, 6]),
3055 };
3056 }
3057
3058 #[tokio::test]
3059 async fn test_peer_info_structure() {
3060 let peer_info = PeerInfo {
3061 peer_id: "test_peer".to_string(),
3062 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3063 connected_at: Instant::now(),
3064 last_seen: Instant::now(),
3065 status: ConnectionStatus::Connected,
3066 protocols: vec!["test-protocol".to_string()],
3067 heartbeat_count: 0,
3068 };
3069
3070 assert_eq!(peer_info.peer_id, "test_peer");
3071 assert_eq!(peer_info.addresses.len(), 1);
3072 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3073 assert_eq!(peer_info.protocols.len(), 1);
3074 }
3075
3076 #[tokio::test]
3077 async fn test_serialization() -> Result<()> {
3078 let config = create_test_node_config();
3080 let serialized = serde_json::to_string(&config)?;
3081 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3082
3083 assert_eq!(config.peer_id, deserialized.peer_id);
3084 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3085 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3086
3087 Ok(())
3088 }
3089
3090 #[tokio::test]
3091 async fn test_get_peer_id_by_address_found() -> Result<()> {
3092 let config = create_test_node_config();
3093 let node = P2PNode::new(config).await?;
3094
3095 let test_peer_id = "peer_test_123".to_string();
3097 let test_address = "192.168.1.100:9000".to_string();
3098
3099 let peer_info = PeerInfo {
3100 peer_id: test_peer_id.clone(),
3101 addresses: vec![test_address.clone()],
3102 connected_at: Instant::now(),
3103 last_seen: Instant::now(),
3104 status: ConnectionStatus::Connected,
3105 protocols: vec!["test-protocol".to_string()],
3106 heartbeat_count: 0,
3107 };
3108
3109 node.peers
3110 .write()
3111 .await
3112 .insert(test_peer_id.clone(), peer_info);
3113
3114 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3116 assert_eq!(found_peer_id, Some(test_peer_id));
3117
3118 Ok(())
3119 }
3120
3121 #[tokio::test]
3122 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3123 let config = create_test_node_config();
3124 let node = P2PNode::new(config).await?;
3125
3126 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3128 assert_eq!(result, None);
3129
3130 Ok(())
3131 }
3132
3133 #[tokio::test]
3134 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3135 let config = create_test_node_config();
3136 let node = P2PNode::new(config).await?;
3137
3138 let result = node.get_peer_id_by_address("invalid-address").await;
3140 assert_eq!(result, None);
3141
3142 Ok(())
3143 }
3144
3145 #[tokio::test]
3146 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3147 let config = create_test_node_config();
3148 let node = P2PNode::new(config).await?;
3149
3150 let peer1_id = "peer_1".to_string();
3152 let peer1_addr = "192.168.1.101:9001".to_string();
3153
3154 let peer2_id = "peer_2".to_string();
3155 let peer2_addr = "192.168.1.102:9002".to_string();
3156
3157 let peer1_info = PeerInfo {
3158 peer_id: peer1_id.clone(),
3159 addresses: vec![peer1_addr.clone()],
3160 connected_at: Instant::now(),
3161 last_seen: Instant::now(),
3162 status: ConnectionStatus::Connected,
3163 protocols: vec!["test-protocol".to_string()],
3164 heartbeat_count: 0,
3165 };
3166
3167 let peer2_info = PeerInfo {
3168 peer_id: peer2_id.clone(),
3169 addresses: vec![peer2_addr.clone()],
3170 connected_at: Instant::now(),
3171 last_seen: Instant::now(),
3172 status: ConnectionStatus::Connected,
3173 protocols: vec!["test-protocol".to_string()],
3174 heartbeat_count: 0,
3175 };
3176
3177 node.peers
3178 .write()
3179 .await
3180 .insert(peer1_id.clone(), peer1_info);
3181 node.peers
3182 .write()
3183 .await
3184 .insert(peer2_id.clone(), peer2_info);
3185
3186 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3188 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3189
3190 assert_eq!(found_peer1, Some(peer1_id));
3191 assert_eq!(found_peer2, Some(peer2_id));
3192
3193 Ok(())
3194 }
3195
3196 #[tokio::test]
3197 async fn test_list_active_connections_empty() -> Result<()> {
3198 let config = create_test_node_config();
3199 let node = P2PNode::new(config).await?;
3200
3201 let connections = node.list_active_connections().await;
3203 assert!(connections.is_empty());
3204
3205 Ok(())
3206 }
3207
3208 #[tokio::test]
3209 async fn test_list_active_connections_with_peers() -> Result<()> {
3210 let config = create_test_node_config();
3211 let node = P2PNode::new(config).await?;
3212
3213 let peer1_id = "peer_1".to_string();
3215 let peer1_addrs = vec![
3216 "192.168.1.101:9001".to_string(),
3217 "192.168.1.101:9002".to_string(),
3218 ];
3219
3220 let peer2_id = "peer_2".to_string();
3221 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3222
3223 let peer1_info = PeerInfo {
3224 peer_id: peer1_id.clone(),
3225 addresses: peer1_addrs.clone(),
3226 connected_at: Instant::now(),
3227 last_seen: Instant::now(),
3228 status: ConnectionStatus::Connected,
3229 protocols: vec!["test-protocol".to_string()],
3230 heartbeat_count: 0,
3231 };
3232
3233 let peer2_info = PeerInfo {
3234 peer_id: peer2_id.clone(),
3235 addresses: peer2_addrs.clone(),
3236 connected_at: Instant::now(),
3237 last_seen: Instant::now(),
3238 status: ConnectionStatus::Connected,
3239 protocols: vec!["test-protocol".to_string()],
3240 heartbeat_count: 0,
3241 };
3242
3243 node.peers
3244 .write()
3245 .await
3246 .insert(peer1_id.clone(), peer1_info);
3247 node.peers
3248 .write()
3249 .await
3250 .insert(peer2_id.clone(), peer2_info);
3251
3252 let connections = node.list_active_connections().await;
3254 assert_eq!(connections.len(), 2);
3255
3256 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3258 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3259
3260 assert!(peer1_conn.is_some());
3261 assert!(peer2_conn.is_some());
3262
3263 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3265 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3266
3267 Ok(())
3268 }
3269
3270 #[tokio::test]
3271 async fn test_remove_peer_success() -> Result<()> {
3272 let config = create_test_node_config();
3273 let node = P2PNode::new(config).await?;
3274
3275 let peer_id = "peer_to_remove".to_string();
3277 let peer_info = PeerInfo {
3278 peer_id: peer_id.clone(),
3279 addresses: vec!["192.168.1.100:9000".to_string()],
3280 connected_at: Instant::now(),
3281 last_seen: Instant::now(),
3282 status: ConnectionStatus::Connected,
3283 protocols: vec!["test-protocol".to_string()],
3284 heartbeat_count: 0,
3285 };
3286
3287 node.peers.write().await.insert(peer_id.clone(), peer_info);
3288
3289 assert!(node.is_peer_connected(&peer_id).await);
3291
3292 let removed = node.remove_peer(&peer_id).await;
3294 assert!(removed);
3295
3296 assert!(!node.is_peer_connected(&peer_id).await);
3298
3299 Ok(())
3300 }
3301
3302 #[tokio::test]
3303 async fn test_remove_peer_nonexistent() -> Result<()> {
3304 let config = create_test_node_config();
3305 let node = P2PNode::new(config).await?;
3306
3307 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3309 assert!(!removed);
3310
3311 Ok(())
3312 }
3313
3314 #[tokio::test]
3315 async fn test_is_peer_connected() -> Result<()> {
3316 let config = create_test_node_config();
3317 let node = P2PNode::new(config).await?;
3318
3319 let peer_id = "test_peer".to_string();
3320
3321 assert!(!node.is_peer_connected(&peer_id).await);
3323
3324 let peer_info = PeerInfo {
3326 peer_id: peer_id.clone(),
3327 addresses: vec!["192.168.1.100:9000".to_string()],
3328 connected_at: Instant::now(),
3329 last_seen: Instant::now(),
3330 status: ConnectionStatus::Connected,
3331 protocols: vec!["test-protocol".to_string()],
3332 heartbeat_count: 0,
3333 };
3334
3335 node.peers.write().await.insert(peer_id.clone(), peer_info);
3336
3337 assert!(node.is_peer_connected(&peer_id).await);
3339
3340 node.remove_peer(&peer_id).await;
3342
3343 assert!(!node.is_peer_connected(&peer_id).await);
3345
3346 Ok(())
3347 }
3348
3349 #[test]
3350 fn test_normalize_ipv6_wildcard() {
3351 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3352
3353 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3354 let normalized = normalize_wildcard_to_loopback(wildcard);
3355
3356 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3357 assert_eq!(normalized.port(), 8080);
3358 }
3359
3360 #[test]
3361 fn test_normalize_ipv4_wildcard() {
3362 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3363
3364 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3365 let normalized = normalize_wildcard_to_loopback(wildcard);
3366
3367 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3368 assert_eq!(normalized.port(), 9000);
3369 }
3370
3371 #[test]
3372 fn test_normalize_specific_address_unchanged() {
3373 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3374 let normalized = normalize_wildcard_to_loopback(specific);
3375
3376 assert_eq!(normalized, specific);
3377 }
3378
3379 #[test]
3380 fn test_normalize_loopback_unchanged() {
3381 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3382
3383 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3384 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3385 assert_eq!(normalized_v6, loopback_v6);
3386
3387 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3388 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3389 assert_eq!(normalized_v4, loopback_v4);
3390 }
3391}