1use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::{HashMap, HashSet};
34use std::sync::Arc;
35use std::sync::atomic::{AtomicBool, Ordering};
36use std::time::Duration;
37use tokio::sync::{RwLock, broadcast};
38use tokio::time::Instant;
39use tracing::{debug, info, trace, warn};
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct NodeConfig {
44 pub peer_id: Option<PeerId>,
46
47 pub listen_addrs: Vec<std::net::SocketAddr>,
49
50 pub listen_addr: std::net::SocketAddr,
52
53 pub bootstrap_peers: Vec<std::net::SocketAddr>,
55
56 pub bootstrap_peers_str: Vec<String>,
58
59 pub enable_ipv6: bool,
61
62 pub connection_timeout: Duration,
65
66 pub keep_alive_interval: Duration,
68
69 pub max_connections: usize,
71
72 pub max_incoming_connections: usize,
74
75 pub dht_config: DHTConfig,
77
78 pub security_config: SecurityConfig,
80
81 pub production_config: Option<ProductionConfig>,
83
84 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct DHTConfig {
91 pub k_value: usize,
93
94 pub alpha_value: usize,
96
97 pub record_ttl: Duration,
99
100 pub refresh_interval: Duration,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct SecurityConfig {
107 pub enable_noise: bool,
109
110 pub enable_tls: bool,
112
113 pub trust_level: TrustLevel,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
119pub enum TrustLevel {
120 None,
122 Basic,
124 Full,
126}
127
128impl NodeConfig {
129 pub fn new() -> Result<Self> {
135 let config = Config::default();
137
138 let listen_addr = config.listen_socket_addr()?;
140
141 let mut listen_addrs = vec![];
143
144 if config.network.ipv6_enabled {
146 let ipv6_addr = std::net::SocketAddr::new(
147 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
148 listen_addr.port(),
149 );
150 listen_addrs.push(ipv6_addr);
151 }
152
153 let ipv4_addr = std::net::SocketAddr::new(
155 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
156 listen_addr.port(),
157 );
158 listen_addrs.push(ipv4_addr);
159
160 Ok(Self {
161 peer_id: None,
162 listen_addrs,
163 listen_addr,
164 bootstrap_peers: Vec::new(),
165 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
166 enable_ipv6: config.network.ipv6_enabled,
167
168 connection_timeout: Duration::from_secs(config.network.connection_timeout),
169 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
170 max_connections: config.network.max_connections,
171 max_incoming_connections: config.security.connection_limit as usize,
172 dht_config: DHTConfig::default(),
173 security_config: SecurityConfig::default(),
174 production_config: None,
175 bootstrap_cache_config: None,
176 })
178 }
179}
180
181impl Default for NodeConfig {
182 fn default() -> Self {
183 let config = Config::default();
185
186 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
188 std::net::SocketAddr::new(
189 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
190 9000,
191 )
192 });
193
194 Self {
195 peer_id: None,
196 listen_addrs: vec![
197 std::net::SocketAddr::new(
198 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
199 listen_addr.port(),
200 ),
201 std::net::SocketAddr::new(
202 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
203 listen_addr.port(),
204 ),
205 ],
206 listen_addr,
207 bootstrap_peers: Vec::new(),
208 bootstrap_peers_str: Vec::new(),
209 enable_ipv6: config.network.ipv6_enabled,
210
211 connection_timeout: Duration::from_secs(config.network.connection_timeout),
212 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
213 max_connections: config.network.max_connections,
214 max_incoming_connections: config.security.connection_limit as usize,
215 dht_config: DHTConfig::default(),
216 security_config: SecurityConfig::default(),
217 production_config: None, bootstrap_cache_config: None,
219 }
221 }
222}
223
224impl NodeConfig {
225 pub fn from_config(config: &Config) -> Result<Self> {
227 let listen_addr = config.listen_socket_addr()?;
228 let bootstrap_addrs = config.bootstrap_addrs()?;
229
230 let mut node_config = Self {
231 peer_id: None,
232 listen_addrs: vec![listen_addr],
233 listen_addr,
234 bootstrap_peers: bootstrap_addrs
235 .iter()
236 .map(|addr| addr.socket_addr())
237 .collect(),
238 bootstrap_peers_str: config
239 .network
240 .bootstrap_nodes
241 .iter()
242 .map(|addr| addr.to_string())
243 .collect(),
244 enable_ipv6: config.network.ipv6_enabled,
245
246 connection_timeout: Duration::from_secs(config.network.connection_timeout),
247 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
248 max_connections: config.network.max_connections,
249 max_incoming_connections: config.security.connection_limit as usize,
250 dht_config: DHTConfig {
251 k_value: 20,
252 alpha_value: 3,
253 record_ttl: Duration::from_secs(3600),
254 refresh_interval: Duration::from_secs(900),
255 },
256 security_config: SecurityConfig {
257 enable_noise: true,
258 enable_tls: true,
259 trust_level: TrustLevel::Basic,
260 },
261 production_config: Some(ProductionConfig {
262 max_connections: config.network.max_connections,
263 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
266 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
267 health_check_interval: Duration::from_secs(30),
268 metrics_interval: Duration::from_secs(60),
269 enable_performance_tracking: true,
270 enable_auto_cleanup: true,
271 shutdown_timeout: Duration::from_secs(30),
272 rate_limits: crate::production::RateLimitConfig::default(),
273 }),
274 bootstrap_cache_config: None,
275 };
280
281 if config.network.ipv6_enabled {
283 node_config.listen_addrs.push(std::net::SocketAddr::new(
284 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
285 listen_addr.port(),
286 ));
287 }
288
289 Ok(node_config)
290 }
291
292 pub fn with_listen_addr(addr: &str) -> Result<Self> {
294 let listen_addr: std::net::SocketAddr = addr
295 .parse()
296 .map_err(|e: std::net::AddrParseError| {
297 NetworkError::InvalidAddress(e.to_string().into())
298 })
299 .map_err(P2PError::Network)?;
300 let cfg = NodeConfig {
301 listen_addr,
302 listen_addrs: vec![listen_addr],
303 ..Default::default()
304 };
305 Ok(cfg)
306 }
307}
308
309impl Default for DHTConfig {
310 fn default() -> Self {
311 Self {
312 k_value: 20,
313 alpha_value: 5,
314 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
317 }
318}
319
320impl Default for SecurityConfig {
321 fn default() -> Self {
322 Self {
323 enable_noise: true,
324 enable_tls: true,
325 trust_level: TrustLevel::Basic,
326 }
327 }
328}
329
330#[derive(Debug, Clone)]
332pub struct PeerInfo {
333 pub peer_id: PeerId,
335
336 pub addresses: Vec<String>,
338
339 pub connected_at: Instant,
341
342 pub last_seen: Instant,
344
345 pub status: ConnectionStatus,
347
348 pub protocols: Vec<String>,
350
351 pub heartbeat_count: u64,
353}
354
355#[derive(Debug, Clone, PartialEq)]
357pub enum ConnectionStatus {
358 Connecting,
360 Connected,
362 Disconnecting,
364 Disconnected,
366 Failed(String),
368}
369
370#[derive(Debug, Clone)]
372pub enum NetworkEvent {
373 PeerConnected {
375 peer_id: PeerId,
377 addresses: Vec<String>,
379 },
380
381 PeerDisconnected {
383 peer_id: PeerId,
385 reason: String,
387 },
388
389 MessageReceived {
391 peer_id: PeerId,
393 protocol: String,
395 data: Vec<u8>,
397 },
398
399 ConnectionFailed {
401 peer_id: Option<PeerId>,
403 address: String,
405 error: String,
407 },
408
409 DHTRecordStored {
411 key: Vec<u8>,
413 value: Vec<u8>,
415 },
416
417 DHTRecordRetrieved {
419 key: Vec<u8>,
421 value: Option<Vec<u8>>,
423 },
424}
425
426#[derive(Debug, Clone)]
431pub enum P2PEvent {
432 Message {
434 topic: String,
436 source: PeerId,
438 data: Vec<u8>,
440 },
441 PeerConnected(PeerId),
443 PeerDisconnected(PeerId),
445}
446
447pub struct P2PNode {
457 config: NodeConfig,
459
460 peer_id: PeerId,
462
463 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
465
466 event_tx: broadcast::Sender<P2PEvent>,
468
469 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
471
472 start_time: Instant,
474
475 running: RwLock<bool>,
477
478 dht: Option<Arc<RwLock<DHT>>>,
480
481 resource_manager: Option<Arc<ResourceManager>>,
483
484 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
486
487 dual_node: Arc<DualStackNetworkNode>,
489
490 #[allow(dead_code)]
492 rate_limiter: Arc<RateLimiter>,
493
494 active_connections: Arc<RwLock<HashSet<PeerId>>>,
497
498 #[allow(dead_code)]
500 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
501
502 #[allow(dead_code)]
504 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
505
506 #[allow(dead_code)]
508 shutdown: Arc<AtomicBool>,
509}
510
511fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
527 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
528
529 if addr.ip().is_unspecified() {
530 let loopback_ip = match addr {
532 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
535 std::net::SocketAddr::new(loopback_ip, addr.port())
536 } else {
537 addr
539 }
540}
541
542impl P2PNode {
543 pub fn new_for_tests() -> Result<Self> {
545 let (event_tx, _) = broadcast::channel(16);
546 Ok(Self {
547 config: NodeConfig::default(),
548 peer_id: "test_peer".to_string(),
549 peers: Arc::new(RwLock::new(HashMap::new())),
550 event_tx,
551 listen_addrs: RwLock::new(Vec::new()),
552 start_time: Instant::now(),
553 running: RwLock::new(false),
554 dht: None,
555 resource_manager: None,
556 bootstrap_manager: None,
557 dual_node: {
558 let v6: Option<std::net::SocketAddr> = "[::1]:0"
560 .parse()
561 .ok()
562 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
563 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
564 let handle = tokio::runtime::Handle::current();
565 let dual_attempt = handle.block_on(
566 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
567 );
568 let dual = match dual_attempt {
569 Ok(d) => d,
570 Err(_e1) => {
571 let fallback = handle.block_on(
573 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
574 None,
575 "127.0.0.1:0".parse().ok(),
576 ),
577 );
578 match fallback {
579 Ok(d) => d,
580 Err(e2) => {
581 return Err(P2PError::Network(NetworkError::BindError(
582 format!("Failed to create dual-stack network node: {}", e2)
583 .into(),
584 )));
585 }
586 }
587 }
588 };
589 Arc::new(dual)
590 },
591 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
592 max_requests: 100,
593 burst_size: 100,
594 window: std::time::Duration::from_secs(1),
595 ..Default::default()
596 })),
597 active_connections: Arc::new(RwLock::new(HashSet::new())),
598 connection_monitor_handle: Arc::new(RwLock::new(None)),
599 keepalive_handle: Arc::new(RwLock::new(None)),
600 shutdown: Arc::new(AtomicBool::new(false)),
601 })
602 }
603 pub async fn new(config: NodeConfig) -> Result<Self> {
605 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
606 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
608 });
609
610 let (event_tx, _) = broadcast::channel(1000);
611
612 {
615 use blake3::Hasher;
616 let mut hasher = Hasher::new();
617 hasher.update(peer_id.as_bytes());
618 let digest = hasher.finalize();
619 let mut nid = [0u8; 32];
620 nid.copy_from_slice(digest.as_bytes());
621 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
622 crate::identity::node_identity::NodeId::from_bytes(nid),
623 ));
624 }
627
628 let dht = if true {
630 let _dht_config = crate::dht::DHTConfig {
632 replication_factor: config.dht_config.k_value,
633 bucket_size: config.dht_config.k_value,
634 alpha: config.dht_config.alpha_value,
635 record_ttl: config.dht_config.record_ttl,
636 bucket_refresh_interval: config.dht_config.refresh_interval,
637 republish_interval: config.dht_config.refresh_interval,
638 max_distance: 160, };
640 let peer_bytes = peer_id.as_bytes();
642 let mut node_id_bytes = [0u8; 32];
643 let len = peer_bytes.len().min(32);
644 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
645 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
646 let dht_instance = DHT::new(node_id).map_err(|e| {
647 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
648 e.to_string().into(),
649 ))
650 })?;
651 Some(Arc::new(RwLock::new(dht_instance)))
652 } else {
653 None
654 };
655
656 let resource_manager = config
660 .production_config
661 .clone()
662 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
663
664 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
666 match BootstrapManager::with_config(cache_config.clone()).await {
667 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
668 Err(e) => {
669 warn!(
670 "Failed to initialize bootstrap manager: {}, continuing without cache",
671 e
672 );
673 None
674 }
675 }
676 } else {
677 match BootstrapManager::new().await {
678 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
679 Err(e) => {
680 warn!(
681 "Failed to initialize bootstrap manager: {}, continuing without cache",
682 e
683 );
684 None
685 }
686 }
687 };
688
689 let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
691 let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
692 let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
693 (v6_addr, v4_addr)
694 } else {
695 let v4_addr = Some(std::net::SocketAddr::new(
697 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
698 config.listen_addr.port(),
699 ));
700 let v6_addr = if config.enable_ipv6 {
701 Some(std::net::SocketAddr::new(
702 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
703 config.listen_addr.port(),
704 ))
705 } else {
706 None
707 };
708 (v6_addr, v4_addr)
709 };
710
711 let dual_node = Arc::new(
712 DualStackNetworkNode::new(v6_opt, v4_opt)
713 .await
714 .map_err(|e| {
715 P2PError::Transport(crate::error::TransportError::SetupFailed(
716 format!("Failed to create dual-stack network nodes: {}", e).into(),
717 ))
718 })?,
719 );
720
721 let rate_limiter = Arc::new(RateLimiter::new(
723 crate::validation::RateLimitConfig::default(),
724 ));
725
726 let active_connections = Arc::new(RwLock::new(HashSet::new()));
728
729 let connection_monitor_handle = {
731 let active_conns = Arc::clone(&active_connections);
732 let peers_map = Arc::new(RwLock::new(HashMap::new())); let event_tx_clone = event_tx.clone();
734 let dual_node_clone = Arc::clone(&dual_node);
735
736 let handle = tokio::spawn(async move {
737 Self::connection_lifecycle_monitor(
738 dual_node_clone,
739 active_conns,
740 peers_map,
741 event_tx_clone,
742 )
743 .await;
744 });
745
746 Arc::new(RwLock::new(Some(handle)))
747 };
748
749 let shutdown = Arc::new(AtomicBool::new(false));
751 let keepalive_handle = {
752 let active_conns = Arc::clone(&active_connections);
753 let dual_node_clone = Arc::clone(&dual_node);
754 let shutdown_clone = Arc::clone(&shutdown);
755
756 let handle = tokio::spawn(async move {
757 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
758 });
759
760 Arc::new(RwLock::new(Some(handle)))
761 };
762
763 let node = Self {
764 config,
765 peer_id,
766 peers: Arc::new(RwLock::new(HashMap::new())),
767 event_tx,
768 listen_addrs: RwLock::new(Vec::new()),
769 start_time: Instant::now(),
770 running: RwLock::new(false),
771 dht,
772 resource_manager,
773 bootstrap_manager,
774 dual_node,
775 rate_limiter,
776 active_connections,
777 connection_monitor_handle,
778 keepalive_handle,
779 shutdown,
780 };
781 info!("Created P2P node with peer ID: {}", node.peer_id);
782
783 node.start_network_listeners().await?;
785
786 node.start_connection_monitor().await;
788
789 Ok(node)
790 }
791
792 pub fn builder() -> NodeBuilder {
794 NodeBuilder::new()
795 }
796
797 pub fn peer_id(&self) -> &PeerId {
799 &self.peer_id
800 }
801
802 pub fn local_addr(&self) -> Option<String> {
803 self.listen_addrs
804 .try_read()
805 .ok()
806 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
807 }
808
809 pub async fn subscribe(&self, topic: &str) -> Result<()> {
810 info!("Subscribed to topic: {}", topic);
813 Ok(())
814 }
815
816 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
817 info!(
818 "Publishing message to topic: {} ({} bytes)",
819 topic,
820 data.len()
821 );
822
823 let peer_list: Vec<PeerId> = {
825 let peers_guard = self.peers.read().await;
826 peers_guard.keys().cloned().collect()
827 };
828
829 if peer_list.is_empty() {
830 debug!("No peers connected, message will only be sent to local subscribers");
831 } else {
832 let mut send_count = 0;
834 for peer_id in &peer_list {
835 match self.send_message(peer_id, topic, data.to_vec()).await {
836 Ok(_) => {
837 send_count += 1;
838 debug!("Sent message to peer: {}", peer_id);
839 }
840 Err(e) => {
841 warn!("Failed to send message to peer {}: {}", peer_id, e);
842 }
843 }
844 }
845 info!(
846 "Published message to {}/{} connected peers",
847 send_count,
848 peer_list.len()
849 );
850 }
851
852 let event = P2PEvent::Message {
854 topic: topic.to_string(),
855 source: self.peer_id.clone(),
856 data: data.to_vec(),
857 };
858 let _ = self.event_tx.send(event);
859
860 Ok(())
861 }
862
863 pub fn config(&self) -> &NodeConfig {
865 &self.config
866 }
867
868 pub async fn start(&self) -> Result<()> {
870 info!("Starting P2P node...");
871
872 if let Some(ref resource_manager) = self.resource_manager {
874 resource_manager.start().await.map_err(|e| {
875 P2PError::Network(crate::error::NetworkError::ProtocolError(
876 format!("Failed to start resource manager: {e}").into(),
877 ))
878 })?;
879 info!("Production resource manager started");
880 }
881
882 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
884 let mut manager = bootstrap_manager.write().await;
885 manager.start_background_tasks().await.map_err(|e| {
886 P2PError::Network(crate::error::NetworkError::ProtocolError(
887 format!("Failed to start bootstrap manager: {e}").into(),
888 ))
889 })?;
890 info!("Bootstrap cache manager started");
891 }
892
893 *self.running.write().await = true;
895
896 self.start_network_listeners().await?;
898
899 let listen_addrs = self.listen_addrs.read().await;
901 info!("P2P node started on addresses: {:?}", *listen_addrs);
902
903 self.start_message_receiving_system().await?;
907
908 self.connect_bootstrap_peers().await?;
910
911 Ok(())
912 }
913
914 async fn start_network_listeners(&self) -> Result<()> {
916 info!("Starting dual-stack listeners (ant-quic)...");
917 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
919 P2PError::Transport(crate::error::TransportError::SetupFailed(
920 format!("Failed to get local addresses: {}", e).into(),
921 ))
922 })?;
923 {
924 let mut la = self.listen_addrs.write().await;
925 *la = addrs.clone();
926 }
927
928 let event_tx = self.event_tx.clone();
930 let peers = self.peers.clone();
931 let rate_limiter = self.rate_limiter.clone();
932 let dual = self.dual_node.clone();
933 tokio::spawn(async move {
934 loop {
935 match dual.accept_any().await {
936 Ok((ant_peer_id, remote_sock)) => {
937 let peer_id =
938 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
939 let remote_addr = NetworkAddress::from(remote_sock);
940 let _ = rate_limiter.check_ip(&remote_sock.ip());
942 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
943 register_new_peer(&peers, &peer_id, &remote_addr).await;
944 }
945 Err(e) => {
946 warn!("Accept failed: {}", e);
947 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
948 }
949 }
950 }
951 });
952
953 info!("Dual-stack listeners active on: {:?}", addrs);
954 Ok(())
955 }
956
957 #[allow(dead_code)]
959 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
960 warn!("QUIC transport temporarily disabled during ant-quic migration");
999 Err(crate::P2PError::Transport(
1001 crate::error::TransportError::SetupFailed(
1002 format!(
1003 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1004 )
1005 .into(),
1006 ),
1007 ))
1008 }
1009
1010 #[allow(dead_code)] async fn start_connection_acceptor(
1013 &self,
1014 transport: Arc<dyn crate::transport::Transport>,
1015 addr: std::net::SocketAddr,
1016 transport_type: crate::transport::TransportType,
1017 ) -> Result<()> {
1018 info!(
1019 "Starting connection acceptor for {:?} on {}",
1020 transport_type, addr
1021 );
1022
1023 let event_tx = self.event_tx.clone();
1025 let _peer_id = self.peer_id.clone();
1026 let peers = Arc::clone(&self.peers);
1027 let rate_limiter = Arc::clone(&self.rate_limiter);
1030
1031 tokio::spawn(async move {
1033 loop {
1034 match transport.accept().await {
1035 Ok(connection) => {
1036 let remote_addr = connection.remote_addr();
1037 let connection_peer_id =
1038 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1039
1040 let socket_addr = remote_addr.socket_addr();
1042 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1043 continue;
1045 }
1046
1047 info!(
1048 "Accepted {:?} connection from {} (peer: {})",
1049 transport_type, remote_addr, connection_peer_id
1050 );
1051
1052 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1054
1055 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1057
1058 spawn_connection_handler(
1060 connection,
1061 connection_peer_id,
1062 event_tx.clone(),
1063 Arc::clone(&peers),
1064 );
1065 }
1066 Err(e) => {
1067 warn!(
1068 "Failed to accept {:?} connection on {}: {}",
1069 transport_type, addr, e
1070 );
1071
1072 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1074 }
1075 }
1076 }
1077 });
1078
1079 info!(
1080 "Connection acceptor background task started for {:?} on {}",
1081 transport_type, addr
1082 );
1083 Ok(())
1084 }
1085
1086 async fn start_message_receiving_system(&self) -> Result<()> {
1088 info!("Starting message receiving system");
1089 let dual = self.dual_node.clone();
1090 let event_tx = self.event_tx.clone();
1091
1092 tokio::spawn(async move {
1093 loop {
1094 match dual.receive_any().await {
1095 Ok((_peer_id, bytes)) => {
1096 #[allow(clippy::collapsible_if)]
1098 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1099 if let (Some(protocol), Some(data), Some(from)) = (
1100 value.get("protocol").and_then(|v| v.as_str()),
1101 value.get("data").and_then(|v| v.as_array()),
1102 value.get("from").and_then(|v| v.as_str()),
1103 ) {
1104 let payload: Vec<u8> = data
1105 .iter()
1106 .filter_map(|v| v.as_u64().map(|n| n as u8))
1107 .collect();
1108 let _ = event_tx.send(P2PEvent::Message {
1109 topic: protocol.to_string(),
1110 source: from.to_string(),
1111 data: payload,
1112 });
1113 }
1114 }
1115 }
1116 Err(e) => {
1117 warn!("Receive error: {}", e);
1118 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1119 }
1120 }
1121 }
1122 });
1123
1124 Ok(())
1125 }
1126
1127 #[allow(dead_code)]
1129 async fn handle_received_message(
1130 &self,
1131 message_data: Vec<u8>,
1132 peer_id: &PeerId,
1133 _protocol: &str,
1134 event_tx: &broadcast::Sender<P2PEvent>,
1135 ) -> Result<()> {
1136 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1140 Ok(message) => {
1141 if let (Some(protocol), Some(data), Some(from)) = (
1142 message.get("protocol").and_then(|v| v.as_str()),
1143 message.get("data").and_then(|v| v.as_array()),
1144 message.get("from").and_then(|v| v.as_str()),
1145 ) {
1146 let data_bytes: Vec<u8> = data
1148 .iter()
1149 .filter_map(|v| v.as_u64().map(|n| n as u8))
1150 .collect();
1151
1152 let event = P2PEvent::Message {
1154 topic: protocol.to_string(),
1155 source: from.to_string(),
1156 data: data_bytes,
1157 };
1158
1159 let _ = event_tx.send(event);
1160 debug!("Generated message event from peer: {}", peer_id);
1161 }
1162 }
1163 Err(e) => {
1164 warn!("Failed to parse received message from {}: {}", peer_id, e);
1165 }
1166 }
1167
1168 Ok(())
1169 }
1170
1171 pub async fn run(&self) -> Result<()> {
1177 if !*self.running.read().await {
1178 self.start().await?;
1179 }
1180
1181 info!("P2P node running...");
1182
1183 loop {
1185 if !*self.running.read().await {
1186 break;
1187 }
1188
1189 self.periodic_tasks().await?;
1191
1192 tokio::time::sleep(Duration::from_millis(100)).await;
1194 }
1195
1196 info!("P2P node stopped");
1197 Ok(())
1198 }
1199
1200 pub async fn stop(&self) -> Result<()> {
1202 info!("Stopping P2P node...");
1203
1204 *self.running.write().await = false;
1206
1207 self.disconnect_all_peers().await?;
1209
1210 if let Some(ref resource_manager) = self.resource_manager {
1212 resource_manager.shutdown().await.map_err(|e| {
1213 P2PError::Network(crate::error::NetworkError::ProtocolError(
1214 format!("Failed to shutdown resource manager: {e}").into(),
1215 ))
1216 })?;
1217 info!("Production resource manager stopped");
1218 }
1219
1220 info!("P2P node stopped");
1221 Ok(())
1222 }
1223
1224 pub async fn shutdown(&self) -> Result<()> {
1226 self.stop().await
1227 }
1228
1229 pub async fn is_running(&self) -> bool {
1231 *self.running.read().await
1232 }
1233
1234 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1236 self.listen_addrs.read().await.clone()
1237 }
1238
1239 pub async fn connected_peers(&self) -> Vec<PeerId> {
1241 self.peers.read().await.keys().cloned().collect()
1242 }
1243
1244 pub async fn peer_count(&self) -> usize {
1246 self.peers.read().await.len()
1247 }
1248
1249 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1251 self.peers.read().await.get(peer_id).cloned()
1252 }
1253
1254 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1266 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1268
1269 let peers = self.peers.read().await;
1270
1271 for (peer_id, peer_info) in peers.iter() {
1273 for peer_addr in &peer_info.addresses {
1275 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1276 && peer_socket == socket_addr
1277 {
1278 return Some(peer_id.clone());
1279 }
1280 }
1281 }
1282
1283 None
1284 }
1285
1286 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1292 let peers = self.peers.read().await;
1293
1294 peers
1295 .iter()
1296 .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1297 .collect()
1298 }
1299
1300 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1312 self.active_connections.write().await.remove(peer_id);
1314 self.peers.write().await.remove(peer_id).is_some()
1316 }
1317
1318 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1331 self.peers.read().await.contains_key(peer_id)
1332 }
1333
1334 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1336 info!("Connecting to peer at: {}", address);
1337
1338 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1340 Some(resource_manager.acquire_connection().await?)
1341 } else {
1342 None
1343 };
1344
1345 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1347 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1348 format!("{}: {}", address, e).into(),
1349 ))
1350 })?;
1351
1352 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1355 if normalized_addr != socket_addr {
1356 info!(
1357 "Normalized wildcard address {} to loopback {}",
1358 socket_addr, normalized_addr
1359 );
1360 }
1361
1362 let addr_list = vec![normalized_addr];
1364 let peer_id = match tokio::time::timeout(
1365 self.config.connection_timeout,
1366 self.dual_node.connect_happy_eyeballs(&addr_list),
1367 )
1368 .await
1369 {
1370 Ok(Ok(peer)) => {
1371 let connected_peer_id =
1372 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1373 info!("Successfully connected to peer: {}", connected_peer_id);
1374 connected_peer_id
1375 }
1376 Ok(Err(e)) => {
1377 warn!("Failed to connect to peer at {}: {}", address, e);
1378 let sanitized_address = address.replace(['/', ':'], "_");
1379 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1380 warn!(
1381 "Using demo peer ID: {} (transport connection failed)",
1382 demo_peer_id
1383 );
1384 demo_peer_id
1385 }
1386 Err(_) => {
1387 warn!(
1388 "Timed out connecting to peer at {} after {:?}",
1389 address, self.config.connection_timeout
1390 );
1391 let sanitized_address = address.replace(['/', ':'], "_");
1392 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1393 demo_peer_id
1394 }
1395 };
1396
1397 let peer_info = PeerInfo {
1399 peer_id: peer_id.clone(),
1400 addresses: vec![address.to_string()],
1401 connected_at: Instant::now(),
1402 last_seen: Instant::now(),
1403 status: ConnectionStatus::Connected,
1404 protocols: vec!["p2p-foundation/1.0".to_string()],
1405 heartbeat_count: 0,
1406 };
1407
1408 self.peers.write().await.insert(peer_id.clone(), peer_info);
1410
1411 self.active_connections
1414 .write()
1415 .await
1416 .insert(peer_id.clone());
1417
1418 if let Some(ref resource_manager) = self.resource_manager {
1420 resource_manager.record_bandwidth(0, 0); }
1422
1423 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1425
1426 info!("Connected to peer: {}", peer_id);
1427 Ok(peer_id)
1428 }
1429
1430 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1432 info!("Disconnecting from peer: {}", peer_id);
1433
1434 self.active_connections.write().await.remove(peer_id);
1436
1437 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1438 peer_info.status = ConnectionStatus::Disconnected;
1439
1440 let _ = self
1442 .event_tx
1443 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1444
1445 info!("Disconnected from peer: {}", peer_id);
1446 }
1447
1448 Ok(())
1449 }
1450
1451 pub async fn is_connection_active(&self, peer_id: &PeerId) -> bool {
1453 self.active_connections.read().await.contains(peer_id)
1454 }
1455
1456 pub async fn send_message(
1458 &self,
1459 peer_id: &PeerId,
1460 protocol: &str,
1461 data: Vec<u8>,
1462 ) -> Result<()> {
1463 debug!(
1464 "Sending message to peer {} on protocol {}",
1465 peer_id, protocol
1466 );
1467
1468 if let Some(ref resource_manager) = self.resource_manager
1470 && !resource_manager
1471 .check_rate_limit(peer_id, "message")
1472 .await?
1473 {
1474 return Err(P2PError::ResourceExhausted(
1475 format!("Rate limit exceeded for peer {}", peer_id).into(),
1476 ));
1477 }
1478
1479 if !self.peers.read().await.contains_key(peer_id) {
1481 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1482 peer_id.to_string().into(),
1483 )));
1484 }
1485
1486 if !self.is_connection_active(peer_id).await {
1489 debug!(
1490 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1491 peer_id
1492 );
1493
1494 self.remove_peer(peer_id).await;
1496
1497 return Err(P2PError::Network(
1498 crate::error::NetworkError::ConnectionClosed {
1499 peer_id: peer_id.to_string().into(),
1500 },
1501 ));
1502 }
1503
1504 if let Some(ref resource_manager) = self.resource_manager {
1508 resource_manager.record_bandwidth(data.len() as u64, 0);
1509 }
1510
1511 let _message_data = self.create_protocol_message(protocol, data)?;
1513
1514 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1516 tokio::time::timeout(self.config.connection_timeout, send_fut)
1517 .await
1518 .map_err(|_| {
1519 P2PError::Transport(crate::error::TransportError::StreamError(
1520 "Timed out sending message".into(),
1521 ))
1522 })?
1523 .map_err(|e| {
1524 P2PError::Transport(crate::error::TransportError::StreamError(
1525 e.to_string().into(),
1526 ))
1527 })
1528 }
1529
1530 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1532 use serde_json::json;
1533
1534 let timestamp = std::time::SystemTime::now()
1535 .duration_since(std::time::UNIX_EPOCH)
1536 .map_err(|e| {
1537 P2PError::Network(NetworkError::ProtocolError(
1538 format!("System time error: {}", e).into(),
1539 ))
1540 })?
1541 .as_secs();
1542
1543 let message = json!({
1545 "protocol": protocol,
1546 "data": data,
1547 "from": self.peer_id,
1548 "timestamp": timestamp
1549 });
1550
1551 serde_json::to_vec(&message).map_err(|e| {
1552 P2PError::Transport(crate::error::TransportError::StreamError(
1553 format!("Failed to serialize message: {e}").into(),
1554 ))
1555 })
1556 }
1557
1558 }
1560
1561#[allow(dead_code)]
1563fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1564 use serde_json::json;
1565
1566 let timestamp = std::time::SystemTime::now()
1567 .duration_since(std::time::UNIX_EPOCH)
1568 .map_err(|e| {
1569 P2PError::Network(NetworkError::ProtocolError(
1570 format!("System time error: {}", e).into(),
1571 ))
1572 })?
1573 .as_secs();
1574
1575 let message = json!({
1577 "protocol": protocol,
1578 "data": data,
1579 "timestamp": timestamp
1580 });
1581
1582 serde_json::to_vec(&message).map_err(|e| {
1583 P2PError::Transport(crate::error::TransportError::StreamError(
1584 format!("Failed to serialize message: {e}").into(),
1585 ))
1586 })
1587}
1588
1589impl P2PNode {
1590 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1592 self.event_tx.subscribe()
1593 }
1594
1595 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1597 self.subscribe_events()
1598 }
1599
1600 pub fn uptime(&self) -> Duration {
1602 self.start_time.elapsed()
1603 }
1604
1605 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1615 if let Some(ref resource_manager) = self.resource_manager {
1616 Ok(resource_manager.get_metrics().await)
1617 } else {
1618 Err(P2PError::Network(
1619 crate::error::NetworkError::ProtocolError(
1620 "Production resource manager not enabled".to_string().into(),
1621 ),
1622 ))
1623 }
1624 }
1625
1626 async fn connection_lifecycle_monitor(
1629 dual_node: Arc<DualStackNetworkNode>,
1630 active_connections: Arc<RwLock<HashSet<String>>>,
1631 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1632 event_tx: broadcast::Sender<P2PEvent>,
1633 ) {
1634 use crate::transport::ant_quic_adapter::ConnectionEvent;
1635
1636 let mut event_rx = dual_node.subscribe_connection_events();
1637
1638 info!("Connection lifecycle monitor started");
1639
1640 loop {
1641 match event_rx.recv().await {
1642 Ok(event) => {
1643 match event {
1644 ConnectionEvent::Established {
1645 peer_id,
1646 remote_address,
1647 } => {
1648 let peer_id_str =
1649 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1650 debug!(
1651 "Connection established: peer={}, addr={}",
1652 peer_id_str, remote_address
1653 );
1654
1655 active_connections.write().await.insert(peer_id_str.clone());
1657
1658 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1660 peer_info.status = ConnectionStatus::Connected;
1661 peer_info.connected_at = Instant::now();
1662 }
1663
1664 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1666 }
1667 ConnectionEvent::Lost { peer_id, reason } => {
1668 let peer_id_str =
1669 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1670 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1671
1672 active_connections.write().await.remove(&peer_id_str);
1674
1675 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1677 peer_info.status = ConnectionStatus::Disconnected;
1678 peer_info.last_seen = Instant::now();
1679 }
1680
1681 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1683 }
1684 ConnectionEvent::Failed { peer_id, reason } => {
1685 let peer_id_str =
1686 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1687 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1688
1689 active_connections.write().await.remove(&peer_id_str);
1691
1692 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1694 peer_info.status = ConnectionStatus::Failed(reason.clone());
1695 }
1696
1697 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1699 }
1700 }
1701 }
1702 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1703 warn!(
1704 "Connection event monitor lagged, skipped {} events",
1705 skipped
1706 );
1707 continue;
1708 }
1709 Err(broadcast::error::RecvError::Closed) => {
1710 info!("Connection event channel closed, stopping monitor");
1711 break;
1712 }
1713 }
1714 }
1715
1716 info!("Connection lifecycle monitor stopped");
1717 }
1718
1719 async fn start_connection_monitor(&self) {
1721 debug!("Connection monitor already running from initialization");
1725 }
1726
1727 async fn keepalive_task(
1733 active_connections: Arc<RwLock<HashSet<String>>>,
1734 dual_node: Arc<DualStackNetworkNode>,
1735 shutdown: Arc<AtomicBool>,
1736 ) {
1737 use tokio::time::{Duration, interval};
1738
1739 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1743 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1744
1745 info!(
1746 "Keepalive task started (interval: {}s)",
1747 KEEPALIVE_INTERVAL_SECS
1748 );
1749
1750 loop {
1751 if shutdown.load(Ordering::Relaxed) {
1753 info!("Keepalive task shutting down");
1754 break;
1755 }
1756
1757 interval.tick().await;
1758
1759 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1761
1762 if peers.is_empty() {
1763 trace!("Keepalive: no active connections");
1764 continue;
1765 }
1766
1767 debug!("Sending keepalive to {} active connections", peers.len());
1768
1769 for peer_id in peers {
1771 match dual_node
1772 .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1773 .await
1774 {
1775 Ok(_) => {
1776 trace!("Keepalive sent to peer: {}", peer_id);
1777 }
1778 Err(e) => {
1779 debug!(
1780 "Failed to send keepalive to peer {}: {} (connection may have closed)",
1781 peer_id, e
1782 );
1783 }
1785 }
1786 }
1787 }
1788
1789 info!("Keepalive task stopped");
1790 }
1791
1792 pub async fn health_check(&self) -> Result<()> {
1794 if let Some(ref resource_manager) = self.resource_manager {
1795 resource_manager.health_check().await
1796 } else {
1797 let peer_count = self.peer_count().await;
1799 if peer_count > self.config.max_connections {
1800 Err(P2PError::Network(
1801 crate::error::NetworkError::ProtocolError(
1802 format!("Too many connections: {peer_count}").into(),
1803 ),
1804 ))
1805 } else {
1806 Ok(())
1807 }
1808 }
1809 }
1810
1811 pub fn production_config(&self) -> Option<&ProductionConfig> {
1813 self.config.production_config.as_ref()
1814 }
1815
1816 pub fn is_production_mode(&self) -> bool {
1818 self.resource_manager.is_some()
1819 }
1820
1821 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1823 self.dht.as_ref()
1824 }
1825
1826 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1828 if let Some(ref dht) = self.dht {
1829 let mut dht_instance = dht.write().await;
1830 let dht_key = crate::dht::DhtKey::from_bytes(key);
1831 dht_instance
1832 .store(&dht_key, value.clone())
1833 .await
1834 .map_err(|e| {
1835 P2PError::Dht(crate::error::DhtError::StoreFailed(
1836 format!("{:?}: {e}", key).into(),
1837 ))
1838 })?;
1839
1840 Ok(())
1841 } else {
1842 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1843 "DHT not enabled".to_string().into(),
1844 )))
1845 }
1846 }
1847
1848 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1850 if let Some(ref dht) = self.dht {
1851 let dht_instance = dht.read().await;
1852 let dht_key = crate::dht::DhtKey::from_bytes(key);
1853 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1854 P2PError::Dht(crate::error::DhtError::StoreFailed(
1855 format!("Retrieve failed: {e}").into(),
1856 ))
1857 })?;
1858
1859 Ok(record_result)
1860 } else {
1861 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1862 "DHT not enabled".to_string().into(),
1863 )))
1864 }
1865 }
1866
1867 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1869 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1870 let mut manager = bootstrap_manager.write().await;
1871 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1872 .iter()
1873 .filter_map(|addr| addr.parse().ok())
1874 .collect();
1875 let contact = ContactEntry::new(peer_id, socket_addresses);
1876 manager.add_contact(contact).await.map_err(|e| {
1877 P2PError::Network(crate::error::NetworkError::ProtocolError(
1878 format!("Failed to add peer to bootstrap cache: {e}").into(),
1879 ))
1880 })?;
1881 }
1882 Ok(())
1883 }
1884
1885 pub async fn update_peer_metrics(
1887 &self,
1888 peer_id: &PeerId,
1889 success: bool,
1890 latency_ms: Option<u64>,
1891 _error: Option<String>,
1892 ) -> Result<()> {
1893 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1894 let mut manager = bootstrap_manager.write().await;
1895
1896 let metrics = QualityMetrics {
1898 success_rate: if success { 1.0 } else { 0.0 },
1899 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1900 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
1902 last_successful_connection: if success {
1903 chrono::Utc::now()
1904 } else {
1905 chrono::Utc::now() - chrono::Duration::hours(1)
1906 },
1907 uptime_score: 0.5,
1908 };
1909
1910 manager
1911 .update_contact_metrics(peer_id, metrics)
1912 .await
1913 .map_err(|e| {
1914 P2PError::Network(crate::error::NetworkError::ProtocolError(
1915 format!("Failed to update peer metrics: {e}").into(),
1916 ))
1917 })?;
1918 }
1919 Ok(())
1920 }
1921
1922 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1924 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1925 let manager = bootstrap_manager.read().await;
1926 let stats = manager.get_stats().await.map_err(|e| {
1927 P2PError::Network(crate::error::NetworkError::ProtocolError(
1928 format!("Failed to get bootstrap stats: {e}").into(),
1929 ))
1930 })?;
1931 Ok(Some(stats))
1932 } else {
1933 Ok(None)
1934 }
1935 }
1936
1937 pub async fn cached_peer_count(&self) -> usize {
1939 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1940 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1941 {
1942 return stats.total_contacts;
1943 }
1944 0
1945 }
1946
1947 async fn connect_bootstrap_peers(&self) -> Result<()> {
1949 let mut bootstrap_contacts = Vec::new();
1950 let mut used_cache = false;
1951
1952 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1954 let manager = bootstrap_manager.read().await;
1955 match manager.get_bootstrap_peers(20).await {
1956 Ok(contacts) => {
1958 if !contacts.is_empty() {
1959 info!("Using {} cached bootstrap peers", contacts.len());
1960 bootstrap_contacts = contacts;
1961 used_cache = true;
1962 }
1963 }
1964 Err(e) => {
1965 warn!("Failed to get cached bootstrap peers: {}", e);
1966 }
1967 }
1968 }
1969
1970 if bootstrap_contacts.is_empty() {
1972 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1973 &self.config.bootstrap_peers_str
1974 } else {
1975 &self
1977 .config
1978 .bootstrap_peers
1979 .iter()
1980 .map(|addr| addr.to_string())
1981 .collect::<Vec<_>>()
1982 };
1983
1984 if bootstrap_peers.is_empty() {
1985 info!("No bootstrap peers configured and no cached peers available");
1986 return Ok(());
1987 }
1988
1989 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1990
1991 for addr in bootstrap_peers {
1992 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1993 let contact = ContactEntry::new(
1994 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1995 vec![socket_addr],
1996 );
1997 bootstrap_contacts.push(contact);
1998 } else {
1999 warn!("Invalid bootstrap address format: {}", addr);
2000 }
2001 }
2002 }
2003
2004 let mut successful_connections = 0;
2006 for contact in bootstrap_contacts {
2007 for addr in &contact.addresses {
2008 match self.connect_peer(&addr.to_string()).await {
2009 Ok(peer_id) => {
2010 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2011 successful_connections += 1;
2012
2013 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2015 let mut manager = bootstrap_manager.write().await;
2016 let mut updated_contact = contact.clone();
2017 updated_contact.peer_id = peer_id.clone();
2018 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2021 warn!("Failed to update bootstrap cache: {}", e);
2022 }
2023 }
2024 break; }
2026 Err(e) => {
2027 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2028
2029 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2031 let mut manager = bootstrap_manager.write().await;
2032 let mut updated_contact = contact.clone();
2033 updated_contact.update_connection_result(
2034 false,
2035 None,
2036 Some(e.to_string()),
2037 );
2038
2039 if let Err(e) = manager.add_contact(updated_contact).await {
2040 warn!("Failed to update bootstrap cache: {}", e);
2041 }
2042 }
2043 }
2044 }
2045 }
2046 }
2047
2048 if successful_connections == 0 {
2049 if !used_cache {
2050 warn!("Failed to connect to any bootstrap peers");
2051 }
2052 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2053 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2055 }));
2056 }
2057 info!(
2058 "Successfully connected to {} bootstrap peers",
2059 successful_connections
2060 );
2061
2062 Ok(())
2063 }
2064
2065 async fn disconnect_all_peers(&self) -> Result<()> {
2067 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2068
2069 for peer_id in peer_ids {
2070 self.disconnect_peer(&peer_id).await?;
2071 }
2072
2073 Ok(())
2074 }
2075
2076 async fn periodic_tasks(&self) -> Result<()> {
2078 Ok(())
2084 }
2085}
2086
2087#[async_trait::async_trait]
2089pub trait NetworkSender: Send + Sync {
2090 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2092
2093 fn local_peer_id(&self) -> &PeerId;
2095}
2096
2097#[derive(Clone)]
2099pub struct P2PNetworkSender {
2100 peer_id: PeerId,
2101 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2103}
2104
2105impl P2PNetworkSender {
2106 pub fn new(
2107 peer_id: PeerId,
2108 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2109 ) -> Self {
2110 Self { peer_id, send_tx }
2111 }
2112}
2113
2114#[async_trait::async_trait]
2116impl NetworkSender for P2PNetworkSender {
2117 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2119 self.send_tx
2120 .send((peer_id.clone(), protocol.to_string(), data))
2121 .map_err(|_| {
2122 P2PError::Network(crate::error::NetworkError::ProtocolError(
2123 "Failed to send message via channel".to_string().into(),
2124 ))
2125 })?;
2126 Ok(())
2127 }
2128
2129 fn local_peer_id(&self) -> &PeerId {
2131 &self.peer_id
2132 }
2133}
2134
2135pub struct NodeBuilder {
2137 config: NodeConfig,
2138}
2139
2140impl Default for NodeBuilder {
2141 fn default() -> Self {
2142 Self::new()
2143 }
2144}
2145
2146impl NodeBuilder {
2147 pub fn new() -> Self {
2149 Self {
2150 config: NodeConfig::default(),
2151 }
2152 }
2153
2154 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2156 self.config.peer_id = Some(peer_id);
2157 self
2158 }
2159
2160 pub fn listen_on(mut self, addr: &str) -> Self {
2162 if let Ok(multiaddr) = addr.parse() {
2163 self.config.listen_addrs.push(multiaddr);
2164 }
2165 self
2166 }
2167
2168 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2170 if let Ok(multiaddr) = addr.parse() {
2171 self.config.bootstrap_peers.push(multiaddr);
2172 }
2173 self.config.bootstrap_peers_str.push(addr.to_string());
2174 self
2175 }
2176
2177 pub fn with_ipv6(mut self, enable: bool) -> Self {
2179 self.config.enable_ipv6 = enable;
2180 self
2181 }
2182
2183 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2187 self.config.connection_timeout = timeout;
2188 self
2189 }
2190
2191 pub fn with_max_connections(mut self, max: usize) -> Self {
2193 self.config.max_connections = max;
2194 self
2195 }
2196
2197 pub fn with_production_mode(mut self) -> Self {
2199 self.config.production_config = Some(ProductionConfig::default());
2200 self
2201 }
2202
2203 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2205 self.config.production_config = Some(production_config);
2206 self
2207 }
2208
2209 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2211 self.config.dht_config = dht_config;
2212 self
2213 }
2214
2215 pub fn with_default_dht(mut self) -> Self {
2217 self.config.dht_config = DHTConfig::default();
2218 self
2219 }
2220
2221 pub async fn build(self) -> Result<P2PNode> {
2223 P2PNode::new(self.config).await
2224 }
2225}
2226
2227#[allow(dead_code)] async fn handle_received_message_standalone(
2230 message_data: Vec<u8>,
2231 peer_id: &PeerId,
2232 _protocol: &str,
2233 event_tx: &broadcast::Sender<P2PEvent>,
2234) -> Result<()> {
2235 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2237 Ok(message) => {
2238 if let (Some(protocol), Some(data), Some(from)) = (
2239 message.get("protocol").and_then(|v| v.as_str()),
2240 message.get("data").and_then(|v| v.as_array()),
2241 message.get("from").and_then(|v| v.as_str()),
2242 ) {
2243 let data_bytes: Vec<u8> = data
2245 .iter()
2246 .filter_map(|v| v.as_u64().map(|n| n as u8))
2247 .collect();
2248
2249 let event = P2PEvent::Message {
2251 topic: protocol.to_string(),
2252 source: from.to_string(),
2253 data: data_bytes,
2254 };
2255
2256 let _ = event_tx.send(event);
2257 debug!("Generated message event from peer: {}", peer_id);
2258 }
2259 }
2260 Err(e) => {
2261 warn!("Failed to parse received message from {}: {}", peer_id, e);
2262 }
2263 }
2264
2265 Ok(())
2266}
2267
2268#[allow(dead_code)]
2272fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2273 match create_protocol_message_static(protocol, data) {
2274 Ok(msg) => Some(msg),
2275 Err(e) => {
2276 warn!("Failed to create protocol message: {}", e);
2277 None
2278 }
2279 }
2280}
2281
2282#[allow(dead_code)]
2284async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2285 match result {
2286 Ok(_) => {
2287 debug!("Message sent to peer {} via transport layer", peer_id);
2288 }
2289 Err(e) => {
2290 warn!("Failed to send message to peer {}: {}", peer_id, e);
2291 }
2292 }
2293}
2294
2295#[allow(dead_code)] fn check_rate_limit(
2298 rate_limiter: &RateLimiter,
2299 socket_addr: &std::net::SocketAddr,
2300 remote_addr: &NetworkAddress,
2301) -> Result<()> {
2302 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2303 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2304 e
2305 })
2306}
2307
2308#[allow(dead_code)] async fn register_new_peer(
2311 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2312 peer_id: &PeerId,
2313 remote_addr: &NetworkAddress,
2314) {
2315 let mut peers_guard = peers.write().await;
2316 let peer_info = PeerInfo {
2317 peer_id: peer_id.clone(),
2318 addresses: vec![remote_addr.to_string()],
2319 connected_at: tokio::time::Instant::now(),
2320 last_seen: tokio::time::Instant::now(),
2321 status: ConnectionStatus::Connected,
2322 protocols: vec!["p2p-chat/1.0.0".to_string()],
2323 heartbeat_count: 0,
2324 };
2325 peers_guard.insert(peer_id.clone(), peer_info);
2326}
2327
2328#[allow(dead_code)] fn spawn_connection_handler(
2331 connection: Box<dyn crate::transport::Connection>,
2332 peer_id: PeerId,
2333 event_tx: broadcast::Sender<P2PEvent>,
2334 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2335) {
2336 tokio::spawn(async move {
2337 handle_peer_connection(connection, peer_id, event_tx, peers).await;
2338 });
2339}
2340
2341#[allow(dead_code)] async fn handle_peer_connection(
2344 mut connection: Box<dyn crate::transport::Connection>,
2345 peer_id: PeerId,
2346 event_tx: broadcast::Sender<P2PEvent>,
2347 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2348) {
2349 loop {
2350 match connection.receive().await {
2351 Ok(message_data) => {
2352 debug!(
2353 "Received {} bytes from peer: {}",
2354 message_data.len(),
2355 peer_id
2356 );
2357
2358 if let Err(e) = handle_received_message_standalone(
2360 message_data,
2361 &peer_id,
2362 "unknown", &event_tx,
2364 )
2365 .await
2366 {
2367 warn!("Failed to handle message from peer {}: {}", peer_id, e);
2368 }
2369 }
2370 Err(e) => {
2371 warn!("Failed to receive message from {}: {}", peer_id, e);
2372
2373 if !connection.is_alive().await {
2375 info!("Connection to {} is dead, removing peer", peer_id);
2376
2377 remove_peer(&peers, &peer_id).await;
2379
2380 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2382
2383 break; }
2385
2386 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2388 }
2389 }
2390 }
2391}
2392
2393#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2396 let mut peers_guard = peers.write().await;
2397 peers_guard.remove(peer_id);
2398}
2399
2400#[allow(dead_code)]
2402async fn update_peer_heartbeat(
2403 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2404 peer_id: &PeerId,
2405) -> Result<()> {
2406 let mut peers_guard = peers.write().await;
2407 match peers_guard.get_mut(peer_id) {
2408 Some(peer_info) => {
2409 peer_info.last_seen = Instant::now();
2410 peer_info.heartbeat_count += 1;
2411 Ok(())
2412 }
2413 None => {
2414 warn!("Received heartbeat from unknown peer: {}", peer_id);
2415 Err(P2PError::Network(NetworkError::PeerNotFound(
2416 format!("Peer {} not found", peer_id).into(),
2417 )))
2418 }
2419 }
2420}
2421
2422#[allow(dead_code)]
2424async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2425 if let Some(manager) = resource_manager {
2426 let metrics = manager.get_metrics().await;
2427 (metrics.memory_used, metrics.cpu_usage)
2428 } else {
2429 (0, 0.0)
2430 }
2431}
2432
2433#[cfg(test)]
2434mod tests {
2435 use super::*;
2436 use std::time::Duration;
2438 use tokio::time::timeout;
2439
2440 fn create_test_node_config() -> NodeConfig {
2446 NodeConfig {
2447 peer_id: Some("test_peer_123".to_string()),
2448 listen_addrs: vec![
2449 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2450 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2451 ],
2452 listen_addr: std::net::SocketAddr::new(
2453 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2454 0,
2455 ),
2456 bootstrap_peers: vec![],
2457 bootstrap_peers_str: vec![],
2458 enable_ipv6: true,
2459
2460 connection_timeout: Duration::from_millis(300),
2461 keep_alive_interval: Duration::from_secs(30),
2462 max_connections: 100,
2463 max_incoming_connections: 50,
2464 dht_config: DHTConfig::default(),
2465 security_config: SecurityConfig::default(),
2466 production_config: None,
2467 bootstrap_cache_config: None,
2468 }
2470 }
2471
2472 #[tokio::test]
2476 async fn test_node_config_default() {
2477 let config = NodeConfig::default();
2478
2479 assert!(config.peer_id.is_none());
2480 assert_eq!(config.listen_addrs.len(), 2);
2481 assert!(config.enable_ipv6);
2482 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2484 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2485 }
2486
2487 #[tokio::test]
2488 async fn test_dht_config_default() {
2489 let config = DHTConfig::default();
2490
2491 assert_eq!(config.k_value, 20);
2492 assert_eq!(config.alpha_value, 5);
2493 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2494 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2495 }
2496
2497 #[tokio::test]
2498 async fn test_security_config_default() {
2499 let config = SecurityConfig::default();
2500
2501 assert!(config.enable_noise);
2502 assert!(config.enable_tls);
2503 assert_eq!(config.trust_level, TrustLevel::Basic);
2504 }
2505
2506 #[test]
2507 fn test_trust_level_variants() {
2508 let _none = TrustLevel::None;
2510 let _basic = TrustLevel::Basic;
2511 let _full = TrustLevel::Full;
2512
2513 assert_eq!(TrustLevel::None, TrustLevel::None);
2515 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2516 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2517 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2518 }
2519
2520 #[test]
2521 fn test_connection_status_variants() {
2522 let connecting = ConnectionStatus::Connecting;
2523 let connected = ConnectionStatus::Connected;
2524 let disconnecting = ConnectionStatus::Disconnecting;
2525 let disconnected = ConnectionStatus::Disconnected;
2526 let failed = ConnectionStatus::Failed("test error".to_string());
2527
2528 assert_eq!(connecting, ConnectionStatus::Connecting);
2529 assert_eq!(connected, ConnectionStatus::Connected);
2530 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2531 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2532 assert_ne!(connecting, connected);
2533
2534 if let ConnectionStatus::Failed(msg) = failed {
2535 assert_eq!(msg, "test error");
2536 } else {
2537 panic!("Expected Failed status");
2538 }
2539 }
2540
2541 #[tokio::test]
2542 async fn test_node_creation() -> Result<()> {
2543 let config = create_test_node_config();
2544 let node = P2PNode::new(config).await?;
2545
2546 assert_eq!(node.peer_id(), "test_peer_123");
2547 assert!(!node.is_running().await);
2548 assert_eq!(node.peer_count().await, 0);
2549 assert!(node.connected_peers().await.is_empty());
2550
2551 Ok(())
2552 }
2553
2554 #[tokio::test]
2555 async fn test_node_creation_without_peer_id() -> Result<()> {
2556 let mut config = create_test_node_config();
2557 config.peer_id = None;
2558
2559 let node = P2PNode::new(config).await?;
2560
2561 assert!(node.peer_id().starts_with("peer_"));
2563 assert!(!node.is_running().await);
2564
2565 Ok(())
2566 }
2567
2568 #[tokio::test]
2569 async fn test_node_lifecycle() -> Result<()> {
2570 let config = create_test_node_config();
2571 let node = P2PNode::new(config).await?;
2572
2573 assert!(!node.is_running().await);
2575
2576 node.start().await?;
2578 assert!(node.is_running().await);
2579
2580 let listen_addrs = node.listen_addrs().await;
2582 assert!(
2583 !listen_addrs.is_empty(),
2584 "Expected at least one listening address"
2585 );
2586
2587 node.stop().await?;
2589 assert!(!node.is_running().await);
2590
2591 Ok(())
2592 }
2593
2594 #[tokio::test]
2595 async fn test_peer_connection() -> Result<()> {
2596 let config = create_test_node_config();
2597 let node = P2PNode::new(config).await?;
2598
2599 let peer_addr = "127.0.0.1:0";
2600
2601 let peer_id = node.connect_peer(peer_addr).await?;
2603 assert!(peer_id.starts_with("peer_from_"));
2604
2605 assert_eq!(node.peer_count().await, 1);
2607
2608 let connected_peers = node.connected_peers().await;
2610 assert_eq!(connected_peers.len(), 1);
2611 assert_eq!(connected_peers[0], peer_id);
2612
2613 let peer_info = node.peer_info(&peer_id).await;
2615 assert!(peer_info.is_some());
2616 let info = peer_info.expect("Peer info should exist after adding peer");
2617 assert_eq!(info.peer_id, peer_id);
2618 assert_eq!(info.status, ConnectionStatus::Connected);
2619 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2620
2621 node.disconnect_peer(&peer_id).await?;
2623 assert_eq!(node.peer_count().await, 0);
2624
2625 Ok(())
2626 }
2627
2628 #[tokio::test]
2629 async fn test_event_subscription() -> Result<()> {
2630 let config = create_test_node_config();
2631 let node = P2PNode::new(config).await?;
2632
2633 let mut events = node.subscribe_events();
2634 let peer_addr = "127.0.0.1:0";
2635
2636 let peer_id = node.connect_peer(peer_addr).await?;
2638
2639 let event = timeout(Duration::from_millis(100), events.recv()).await;
2641 assert!(event.is_ok());
2642
2643 let event_result = event
2644 .expect("Should receive event")
2645 .expect("Event should not be error");
2646 match event_result {
2647 P2PEvent::PeerConnected(event_peer_id) => {
2648 assert_eq!(event_peer_id, peer_id);
2649 }
2650 _ => panic!("Expected PeerConnected event"),
2651 }
2652
2653 node.disconnect_peer(&peer_id).await?;
2655
2656 let event = timeout(Duration::from_millis(100), events.recv()).await;
2658 assert!(event.is_ok());
2659
2660 let event_result = event
2661 .expect("Should receive event")
2662 .expect("Event should not be error");
2663 match event_result {
2664 P2PEvent::PeerDisconnected(event_peer_id) => {
2665 assert_eq!(event_peer_id, peer_id);
2666 }
2667 _ => panic!("Expected PeerDisconnected event"),
2668 }
2669
2670 Ok(())
2671 }
2672
2673 #[tokio::test]
2674 async fn test_message_sending() -> Result<()> {
2675 let mut config1 = create_test_node_config();
2677 config1.listen_addr =
2678 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2679 let node1 = P2PNode::new(config1).await?;
2680 node1.start().await?;
2681
2682 let mut config2 = create_test_node_config();
2683 config2.listen_addr =
2684 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2685 let node2 = P2PNode::new(config2).await?;
2686 node2.start().await?;
2687
2688 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2690
2691 let node2_addr = node2.local_addr().ok_or_else(|| {
2693 P2PError::Network(crate::error::NetworkError::ProtocolError(
2694 "No listening address".to_string().into(),
2695 ))
2696 })?;
2697
2698 let peer_id =
2700 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2701 Ok(res) => res?,
2702 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2703 };
2704
2705 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2707
2708 let message_data = b"Hello, peer!".to_vec();
2710 let result = match timeout(
2711 Duration::from_millis(500),
2712 node1.send_message(&peer_id, "test-protocol", message_data),
2713 )
2714 .await
2715 {
2716 Ok(res) => res,
2717 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2718 };
2719 if let Err(e) = &result {
2722 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2723 }
2724
2725 let non_existent_peer = "non_existent_peer".to_string();
2727 let result = node1
2728 .send_message(&non_existent_peer, "test-protocol", vec![])
2729 .await;
2730 assert!(result.is_err(), "Sending to non-existent peer should fail");
2731
2732 Ok(())
2733 }
2734
2735 #[tokio::test]
2736 async fn test_remote_mcp_operations() -> Result<()> {
2737 let config = create_test_node_config();
2738 let node = P2PNode::new(config).await?;
2739
2740 node.start().await?;
2742 node.stop().await?;
2743 Ok(())
2744 }
2745
2746 #[tokio::test]
2747 async fn test_health_check() -> Result<()> {
2748 let config = create_test_node_config();
2749 let node = P2PNode::new(config).await?;
2750
2751 let result = node.health_check().await;
2753 assert!(result.is_ok());
2754
2755 Ok(())
2760 }
2761
2762 #[tokio::test]
2763 async fn test_node_uptime() -> Result<()> {
2764 let config = create_test_node_config();
2765 let node = P2PNode::new(config).await?;
2766
2767 let uptime1 = node.uptime();
2768 assert!(uptime1 >= Duration::from_secs(0));
2769
2770 tokio::time::sleep(Duration::from_millis(10)).await;
2772
2773 let uptime2 = node.uptime();
2774 assert!(uptime2 > uptime1);
2775
2776 Ok(())
2777 }
2778
2779 #[tokio::test]
2780 async fn test_node_config_access() -> Result<()> {
2781 let config = create_test_node_config();
2782 let expected_peer_id = config.peer_id.clone();
2783 let node = P2PNode::new(config).await?;
2784
2785 let node_config = node.config();
2786 assert_eq!(node_config.peer_id, expected_peer_id);
2787 assert_eq!(node_config.max_connections, 100);
2788 Ok(())
2791 }
2792
2793 #[tokio::test]
2794 async fn test_mcp_server_access() -> Result<()> {
2795 let config = create_test_node_config();
2796 let _node = P2PNode::new(config).await?;
2797
2798 Ok(())
2800 }
2801
2802 #[tokio::test]
2803 async fn test_dht_access() -> Result<()> {
2804 let config = create_test_node_config();
2805 let node = P2PNode::new(config).await?;
2806
2807 assert!(node.dht().is_some());
2809
2810 Ok(())
2811 }
2812
2813 #[tokio::test]
2814 async fn test_node_builder() -> Result<()> {
2815 let builder = P2PNode::builder()
2817 .with_peer_id("builder_test_peer".to_string())
2818 .listen_on("/ip4/127.0.0.1/tcp/0")
2819 .listen_on("/ip6/::1/tcp/0")
2820 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
2822 .with_connection_timeout(Duration::from_secs(15))
2823 .with_max_connections(200);
2824
2825 let config = builder.config;
2827 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2828 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
2831 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2832 assert_eq!(config.max_connections, 200);
2833
2834 Ok(())
2835 }
2836
2837 #[tokio::test]
2838 async fn test_bootstrap_peers() -> Result<()> {
2839 let mut config = create_test_node_config();
2840 config.bootstrap_peers = vec![
2841 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2842 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2843 ];
2844
2845 let node = P2PNode::new(config).await?;
2846
2847 node.start().await?;
2849
2850 let peer_count = node.peer_count().await;
2853 assert!(
2854 peer_count <= 2,
2855 "Peer count should not exceed bootstrap peer count"
2856 );
2857
2858 node.stop().await?;
2859 Ok(())
2860 }
2861
2862 #[tokio::test]
2863 async fn test_production_mode_disabled() -> Result<()> {
2864 let config = create_test_node_config();
2865 let node = P2PNode::new(config).await?;
2866
2867 assert!(!node.is_production_mode());
2868 assert!(node.production_config().is_none());
2869
2870 let result = node.resource_metrics().await;
2872 assert!(result.is_err());
2873 assert!(result.unwrap_err().to_string().contains("not enabled"));
2874
2875 Ok(())
2876 }
2877
2878 #[tokio::test]
2879 async fn test_network_event_variants() {
2880 let peer_id = "test_peer".to_string();
2882 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2883
2884 let _peer_connected = NetworkEvent::PeerConnected {
2885 peer_id: peer_id.clone(),
2886 addresses: vec![address.clone()],
2887 };
2888
2889 let _peer_disconnected = NetworkEvent::PeerDisconnected {
2890 peer_id: peer_id.clone(),
2891 reason: "test disconnect".to_string(),
2892 };
2893
2894 let _message_received = NetworkEvent::MessageReceived {
2895 peer_id: peer_id.clone(),
2896 protocol: "test-protocol".to_string(),
2897 data: vec![1, 2, 3],
2898 };
2899
2900 let _connection_failed = NetworkEvent::ConnectionFailed {
2901 peer_id: Some(peer_id.clone()),
2902 address: address.clone(),
2903 error: "connection refused".to_string(),
2904 };
2905
2906 let _dht_stored = NetworkEvent::DHTRecordStored {
2907 key: vec![1, 2, 3],
2908 value: vec![4, 5, 6],
2909 };
2910
2911 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2912 key: vec![1, 2, 3],
2913 value: Some(vec![4, 5, 6]),
2914 };
2915 }
2916
2917 #[tokio::test]
2918 async fn test_peer_info_structure() {
2919 let peer_info = PeerInfo {
2920 peer_id: "test_peer".to_string(),
2921 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2922 connected_at: Instant::now(),
2923 last_seen: Instant::now(),
2924 status: ConnectionStatus::Connected,
2925 protocols: vec!["test-protocol".to_string()],
2926 heartbeat_count: 0,
2927 };
2928
2929 assert_eq!(peer_info.peer_id, "test_peer");
2930 assert_eq!(peer_info.addresses.len(), 1);
2931 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2932 assert_eq!(peer_info.protocols.len(), 1);
2933 }
2934
2935 #[tokio::test]
2936 async fn test_serialization() -> Result<()> {
2937 let config = create_test_node_config();
2939 let serialized = serde_json::to_string(&config)?;
2940 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2941
2942 assert_eq!(config.peer_id, deserialized.peer_id);
2943 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2944 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2945
2946 Ok(())
2947 }
2948
2949 #[tokio::test]
2950 async fn test_get_peer_id_by_address_found() -> Result<()> {
2951 let config = create_test_node_config();
2952 let node = P2PNode::new(config).await?;
2953
2954 let test_peer_id = "peer_test_123".to_string();
2956 let test_address = "192.168.1.100:9000".to_string();
2957
2958 let peer_info = PeerInfo {
2959 peer_id: test_peer_id.clone(),
2960 addresses: vec![test_address.clone()],
2961 connected_at: Instant::now(),
2962 last_seen: Instant::now(),
2963 status: ConnectionStatus::Connected,
2964 protocols: vec!["test-protocol".to_string()],
2965 heartbeat_count: 0,
2966 };
2967
2968 node.peers
2969 .write()
2970 .await
2971 .insert(test_peer_id.clone(), peer_info);
2972
2973 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
2975 assert_eq!(found_peer_id, Some(test_peer_id));
2976
2977 Ok(())
2978 }
2979
2980 #[tokio::test]
2981 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
2982 let config = create_test_node_config();
2983 let node = P2PNode::new(config).await?;
2984
2985 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
2987 assert_eq!(result, None);
2988
2989 Ok(())
2990 }
2991
2992 #[tokio::test]
2993 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
2994 let config = create_test_node_config();
2995 let node = P2PNode::new(config).await?;
2996
2997 let result = node.get_peer_id_by_address("invalid-address").await;
2999 assert_eq!(result, None);
3000
3001 Ok(())
3002 }
3003
3004 #[tokio::test]
3005 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3006 let config = create_test_node_config();
3007 let node = P2PNode::new(config).await?;
3008
3009 let peer1_id = "peer_1".to_string();
3011 let peer1_addr = "192.168.1.101:9001".to_string();
3012
3013 let peer2_id = "peer_2".to_string();
3014 let peer2_addr = "192.168.1.102:9002".to_string();
3015
3016 let peer1_info = PeerInfo {
3017 peer_id: peer1_id.clone(),
3018 addresses: vec![peer1_addr.clone()],
3019 connected_at: Instant::now(),
3020 last_seen: Instant::now(),
3021 status: ConnectionStatus::Connected,
3022 protocols: vec!["test-protocol".to_string()],
3023 heartbeat_count: 0,
3024 };
3025
3026 let peer2_info = PeerInfo {
3027 peer_id: peer2_id.clone(),
3028 addresses: vec![peer2_addr.clone()],
3029 connected_at: Instant::now(),
3030 last_seen: Instant::now(),
3031 status: ConnectionStatus::Connected,
3032 protocols: vec!["test-protocol".to_string()],
3033 heartbeat_count: 0,
3034 };
3035
3036 node.peers
3037 .write()
3038 .await
3039 .insert(peer1_id.clone(), peer1_info);
3040 node.peers
3041 .write()
3042 .await
3043 .insert(peer2_id.clone(), peer2_info);
3044
3045 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3047 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3048
3049 assert_eq!(found_peer1, Some(peer1_id));
3050 assert_eq!(found_peer2, Some(peer2_id));
3051
3052 Ok(())
3053 }
3054
3055 #[tokio::test]
3056 async fn test_list_active_connections_empty() -> Result<()> {
3057 let config = create_test_node_config();
3058 let node = P2PNode::new(config).await?;
3059
3060 let connections = node.list_active_connections().await;
3062 assert!(connections.is_empty());
3063
3064 Ok(())
3065 }
3066
3067 #[tokio::test]
3068 async fn test_list_active_connections_with_peers() -> Result<()> {
3069 let config = create_test_node_config();
3070 let node = P2PNode::new(config).await?;
3071
3072 let peer1_id = "peer_1".to_string();
3074 let peer1_addrs = vec![
3075 "192.168.1.101:9001".to_string(),
3076 "192.168.1.101:9002".to_string(),
3077 ];
3078
3079 let peer2_id = "peer_2".to_string();
3080 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3081
3082 let peer1_info = PeerInfo {
3083 peer_id: peer1_id.clone(),
3084 addresses: peer1_addrs.clone(),
3085 connected_at: Instant::now(),
3086 last_seen: Instant::now(),
3087 status: ConnectionStatus::Connected,
3088 protocols: vec!["test-protocol".to_string()],
3089 heartbeat_count: 0,
3090 };
3091
3092 let peer2_info = PeerInfo {
3093 peer_id: peer2_id.clone(),
3094 addresses: peer2_addrs.clone(),
3095 connected_at: Instant::now(),
3096 last_seen: Instant::now(),
3097 status: ConnectionStatus::Connected,
3098 protocols: vec!["test-protocol".to_string()],
3099 heartbeat_count: 0,
3100 };
3101
3102 node.peers
3103 .write()
3104 .await
3105 .insert(peer1_id.clone(), peer1_info);
3106 node.peers
3107 .write()
3108 .await
3109 .insert(peer2_id.clone(), peer2_info);
3110
3111 let connections = node.list_active_connections().await;
3113 assert_eq!(connections.len(), 2);
3114
3115 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3117 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3118
3119 assert!(peer1_conn.is_some());
3120 assert!(peer2_conn.is_some());
3121
3122 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3124 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3125
3126 Ok(())
3127 }
3128
3129 #[tokio::test]
3130 async fn test_remove_peer_success() -> Result<()> {
3131 let config = create_test_node_config();
3132 let node = P2PNode::new(config).await?;
3133
3134 let peer_id = "peer_to_remove".to_string();
3136 let peer_info = PeerInfo {
3137 peer_id: peer_id.clone(),
3138 addresses: vec!["192.168.1.100:9000".to_string()],
3139 connected_at: Instant::now(),
3140 last_seen: Instant::now(),
3141 status: ConnectionStatus::Connected,
3142 protocols: vec!["test-protocol".to_string()],
3143 heartbeat_count: 0,
3144 };
3145
3146 node.peers.write().await.insert(peer_id.clone(), peer_info);
3147
3148 assert!(node.is_peer_connected(&peer_id).await);
3150
3151 let removed = node.remove_peer(&peer_id).await;
3153 assert!(removed);
3154
3155 assert!(!node.is_peer_connected(&peer_id).await);
3157
3158 Ok(())
3159 }
3160
3161 #[tokio::test]
3162 async fn test_remove_peer_nonexistent() -> Result<()> {
3163 let config = create_test_node_config();
3164 let node = P2PNode::new(config).await?;
3165
3166 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3168 assert!(!removed);
3169
3170 Ok(())
3171 }
3172
3173 #[tokio::test]
3174 async fn test_is_peer_connected() -> Result<()> {
3175 let config = create_test_node_config();
3176 let node = P2PNode::new(config).await?;
3177
3178 let peer_id = "test_peer".to_string();
3179
3180 assert!(!node.is_peer_connected(&peer_id).await);
3182
3183 let peer_info = PeerInfo {
3185 peer_id: peer_id.clone(),
3186 addresses: vec!["192.168.1.100:9000".to_string()],
3187 connected_at: Instant::now(),
3188 last_seen: Instant::now(),
3189 status: ConnectionStatus::Connected,
3190 protocols: vec!["test-protocol".to_string()],
3191 heartbeat_count: 0,
3192 };
3193
3194 node.peers.write().await.insert(peer_id.clone(), peer_info);
3195
3196 assert!(node.is_peer_connected(&peer_id).await);
3198
3199 node.remove_peer(&peer_id).await;
3201
3202 assert!(!node.is_peer_connected(&peer_id).await);
3204
3205 Ok(())
3206 }
3207
3208 #[test]
3209 fn test_normalize_ipv6_wildcard() {
3210 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3211
3212 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3213 let normalized = normalize_wildcard_to_loopback(wildcard);
3214
3215 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3216 assert_eq!(normalized.port(), 8080);
3217 }
3218
3219 #[test]
3220 fn test_normalize_ipv4_wildcard() {
3221 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3222
3223 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3224 let normalized = normalize_wildcard_to_loopback(wildcard);
3225
3226 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3227 assert_eq!(normalized.port(), 9000);
3228 }
3229
3230 #[test]
3231 fn test_normalize_specific_address_unchanged() {
3232 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3233 let normalized = normalize_wildcard_to_loopback(specific);
3234
3235 assert_eq!(normalized, specific);
3236 }
3237
3238 #[test]
3239 fn test_normalize_loopback_unchanged() {
3240 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3241
3242 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3243 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3244 assert_eq!(normalized_v6, loopback_v6);
3245
3246 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3247 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3248 assert_eq!(normalized_v4, loopback_v4);
3249 }
3250}