saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] // Temporarily unused during migration
31use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, error, info, trace, warn};
43
44/// Configuration for a P2P node
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47    /// Local peer ID for this node
48    pub peer_id: Option<PeerId>,
49
50    /// Addresses to listen on for incoming connections
51    pub listen_addrs: Vec<std::net::SocketAddr>,
52
53    /// Primary listen address (for compatibility)
54    pub listen_addr: std::net::SocketAddr,
55
56    /// Bootstrap peers to connect to on startup (legacy)
57    pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59    /// Bootstrap peers as strings (for integration tests)
60    pub bootstrap_peers_str: Vec<String>,
61
62    /// Enable IPv6 support
63    pub enable_ipv6: bool,
64
65    // MCP removed; will be redesigned later
66    /// Connection timeout duration
67    pub connection_timeout: Duration,
68
69    /// Keep-alive interval for connections
70    pub keep_alive_interval: Duration,
71
72    /// Maximum number of concurrent connections
73    pub max_connections: usize,
74
75    /// Maximum number of incoming connections
76    pub max_incoming_connections: usize,
77
78    /// DHT configuration
79    pub dht_config: DHTConfig,
80
81    /// Security configuration
82    pub security_config: SecurityConfig,
83
84    /// Production hardening configuration
85    pub production_config: Option<ProductionConfig>,
86
87    /// Bootstrap cache configuration
88    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
89
90    /// Optional IP diversity configuration for Sybil protection tuning.
91    ///
92    /// When set, this configuration is used by bootstrap peer discovery and
93    /// other diversity-enforcing subsystems. If `None`, defaults are used.
94    pub diversity_config: Option<crate::security::IPDiversityConfig>,
95
96    /// Attestation configuration for software integrity verification.
97    ///
98    /// Controls how nodes verify each other's software attestation during handshake.
99    /// In Phase 1, this is used for "soft enforcement" (logging only).
100    #[serde(default)]
101    pub attestation_config: crate::attestation::AttestationConfig,
102}
103
104/// DHT-specific configuration
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct DHTConfig {
107    /// Kademlia K parameter (bucket size)
108    pub k_value: usize,
109
110    /// Kademlia alpha parameter (parallelism)
111    pub alpha_value: usize,
112
113    /// DHT record TTL
114    pub record_ttl: Duration,
115
116    /// DHT refresh interval
117    pub refresh_interval: Duration,
118}
119
120/// Security configuration
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SecurityConfig {
123    /// Enable noise protocol for encryption
124    pub enable_noise: bool,
125
126    /// Enable TLS for secure transport
127    pub enable_tls: bool,
128
129    /// Trust level for peer verification
130    pub trust_level: TrustLevel,
131}
132
133/// Trust level for peer verification
134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
135pub enum TrustLevel {
136    /// No verification required
137    None,
138    /// Basic peer ID verification
139    Basic,
140    /// Full cryptographic verification
141    Full,
142}
143
144impl NodeConfig {
145    /// Create a new NodeConfig with default values
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if default addresses cannot be parsed
150    pub fn new() -> Result<Self> {
151        // Load config and use its defaults
152        let config = Config::default();
153
154        // Parse the default listen address
155        let listen_addr = config.listen_socket_addr()?;
156
157        // Create listen addresses based on config
158        let mut listen_addrs = vec![];
159
160        // Add IPv6 address if enabled
161        if config.network.ipv6_enabled {
162            let ipv6_addr = std::net::SocketAddr::new(
163                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
164                listen_addr.port(),
165            );
166            listen_addrs.push(ipv6_addr);
167        }
168
169        // Always add IPv4
170        let ipv4_addr = std::net::SocketAddr::new(
171            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
172            listen_addr.port(),
173        );
174        listen_addrs.push(ipv4_addr);
175
176        Ok(Self {
177            peer_id: None,
178            listen_addrs,
179            listen_addr,
180            bootstrap_peers: Vec::new(),
181            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
182            enable_ipv6: config.network.ipv6_enabled,
183
184            connection_timeout: Duration::from_secs(config.network.connection_timeout),
185            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
186            max_connections: config.network.max_connections,
187            max_incoming_connections: config.security.connection_limit as usize,
188            dht_config: DHTConfig::default(),
189            security_config: SecurityConfig::default(),
190            production_config: None,
191            bootstrap_cache_config: None,
192            diversity_config: None,
193            attestation_config: config.attestation.clone(),
194        })
195    }
196}
197
198impl Default for NodeConfig {
199    fn default() -> Self {
200        // Use config defaults for network settings
201        let config = Config::default();
202
203        // Parse the default listen address - use safe fallback if parsing fails
204        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
205            std::net::SocketAddr::new(
206                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
207                9000,
208            )
209        });
210
211        Self {
212            peer_id: None,
213            listen_addrs: vec![
214                std::net::SocketAddr::new(
215                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
216                    listen_addr.port(),
217                ),
218                std::net::SocketAddr::new(
219                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
220                    listen_addr.port(),
221                ),
222            ],
223            listen_addr,
224            bootstrap_peers: Vec::new(),
225            bootstrap_peers_str: Vec::new(),
226            enable_ipv6: config.network.ipv6_enabled,
227
228            connection_timeout: Duration::from_secs(config.network.connection_timeout),
229            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
230            max_connections: config.network.max_connections,
231            max_incoming_connections: config.security.connection_limit as usize,
232            dht_config: DHTConfig::default(),
233            security_config: SecurityConfig::default(),
234            production_config: None, // Use default production config if enabled
235            bootstrap_cache_config: None,
236            diversity_config: None,
237            attestation_config: config.attestation.clone(),
238        }
239    }
240}
241
242impl NodeConfig {
243    /// Create NodeConfig from Config
244    pub fn from_config(config: &Config) -> Result<Self> {
245        let listen_addr = config.listen_socket_addr()?;
246        let bootstrap_addrs = config.bootstrap_addrs()?;
247
248        let mut node_config = Self {
249            peer_id: None,
250            listen_addrs: vec![listen_addr],
251            listen_addr,
252            bootstrap_peers: bootstrap_addrs
253                .iter()
254                .map(|addr| addr.socket_addr())
255                .collect(),
256            bootstrap_peers_str: config
257                .network
258                .bootstrap_nodes
259                .iter()
260                .map(|addr| addr.to_string())
261                .collect(),
262            enable_ipv6: config.network.ipv6_enabled,
263
264            connection_timeout: Duration::from_secs(config.network.connection_timeout),
265            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266            max_connections: config.network.max_connections,
267            max_incoming_connections: config.security.connection_limit as usize,
268            dht_config: DHTConfig {
269                k_value: 20,
270                alpha_value: 3,
271                record_ttl: Duration::from_secs(3600),
272                refresh_interval: Duration::from_secs(900),
273            },
274            security_config: SecurityConfig {
275                enable_noise: true,
276                enable_tls: true,
277                trust_level: TrustLevel::Basic,
278            },
279            production_config: Some(ProductionConfig {
280                max_connections: config.network.max_connections,
281                max_memory_bytes: 0,  // unlimited
282                max_bandwidth_bps: 0, // unlimited
283                connection_timeout: Duration::from_secs(config.network.connection_timeout),
284                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
285                health_check_interval: Duration::from_secs(30),
286                metrics_interval: Duration::from_secs(60),
287                enable_performance_tracking: true,
288                enable_auto_cleanup: true,
289                shutdown_timeout: Duration::from_secs(30),
290                rate_limits: crate::production::RateLimitConfig::default(),
291            }),
292            bootstrap_cache_config: None,
293            diversity_config: None,
294            attestation_config: config.attestation.clone(),
295        };
296
297        // Add IPv6 listen address if enabled
298        if config.network.ipv6_enabled {
299            node_config.listen_addrs.push(std::net::SocketAddr::new(
300                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
301                listen_addr.port(),
302            ));
303        }
304
305        Ok(node_config)
306    }
307
308    /// Try to build a NodeConfig from a listen address string
309    pub fn with_listen_addr(addr: &str) -> Result<Self> {
310        let listen_addr: std::net::SocketAddr = addr
311            .parse()
312            .map_err(|e: std::net::AddrParseError| {
313                NetworkError::InvalidAddress(e.to_string().into())
314            })
315            .map_err(P2PError::Network)?;
316        let cfg = NodeConfig {
317            listen_addr,
318            listen_addrs: vec![listen_addr],
319            diversity_config: None,
320            ..Default::default()
321        };
322        Ok(cfg)
323    }
324}
325
326impl Default for DHTConfig {
327    fn default() -> Self {
328        Self {
329            k_value: 20,
330            alpha_value: 5,
331            record_ttl: Duration::from_secs(3600), // 1 hour
332            refresh_interval: Duration::from_secs(600), // 10 minutes
333        }
334    }
335}
336
337impl Default for SecurityConfig {
338    fn default() -> Self {
339        Self {
340            enable_noise: true,
341            enable_tls: true,
342            trust_level: TrustLevel::Basic,
343        }
344    }
345}
346
347/// Information about a connected peer
348#[derive(Debug, Clone)]
349pub struct PeerInfo {
350    /// Peer identifier
351    pub peer_id: PeerId,
352
353    /// Peer's addresses
354    pub addresses: Vec<String>,
355
356    /// Connection timestamp
357    pub connected_at: Instant,
358
359    /// Last seen timestamp
360    pub last_seen: Instant,
361
362    /// Connection status
363    pub status: ConnectionStatus,
364
365    /// Supported protocols
366    pub protocols: Vec<String>,
367
368    /// Number of heartbeats received
369    pub heartbeat_count: u64,
370}
371
372/// Connection status for a peer
373#[derive(Debug, Clone, PartialEq)]
374pub enum ConnectionStatus {
375    /// Connection is being established
376    Connecting,
377    /// Connection is established and active
378    Connected,
379    /// Connection is being closed
380    Disconnecting,
381    /// Connection is closed
382    Disconnected,
383    /// Connection failed
384    Failed(String),
385}
386
387/// Network events that can occur
388#[derive(Debug, Clone)]
389pub enum NetworkEvent {
390    /// A new peer has connected
391    PeerConnected {
392        /// The identifier of the newly connected peer
393        peer_id: PeerId,
394        /// The network addresses where the peer can be reached
395        addresses: Vec<String>,
396    },
397
398    /// A peer has disconnected
399    PeerDisconnected {
400        /// The identifier of the disconnected peer
401        peer_id: PeerId,
402        /// The reason for the disconnection
403        reason: String,
404    },
405
406    /// A message was received from a peer
407    MessageReceived {
408        /// The identifier of the sending peer
409        peer_id: PeerId,
410        /// The protocol used for the message
411        protocol: String,
412        /// The raw message data
413        data: Vec<u8>,
414    },
415
416    /// A connection attempt failed
417    ConnectionFailed {
418        /// The identifier of the peer (if known)
419        peer_id: Option<PeerId>,
420        /// The address where connection was attempted
421        address: String,
422        /// The error message describing the failure
423        error: String,
424    },
425
426    /// DHT record was stored
427    DHTRecordStored {
428        /// The DHT key where the record was stored
429        key: Vec<u8>,
430        /// The value that was stored
431        value: Vec<u8>,
432    },
433
434    /// DHT record was retrieved
435    DHTRecordRetrieved {
436        /// The DHT key that was queried
437        key: Vec<u8>,
438        /// The retrieved value, if found
439        value: Option<Vec<u8>>,
440    },
441}
442
443/// Network events that can occur in the P2P system
444///
445/// Events are broadcast to all listeners and provide real-time
446/// notifications of network state changes and message arrivals.
447#[derive(Debug, Clone)]
448pub enum P2PEvent {
449    /// Message received from a peer on a specific topic
450    Message {
451        /// Topic or channel the message was sent on
452        topic: String,
453        /// Peer ID of the message sender
454        source: PeerId,
455        /// Raw message data payload
456        data: Vec<u8>,
457    },
458    /// A new peer has connected to the network
459    PeerConnected(PeerId),
460    /// A peer has disconnected from the network
461    PeerDisconnected(PeerId),
462}
463
464/// Main P2P node structure
465/// Main P2P network node that manages connections, routing, and communication
466///
467/// This struct represents a complete P2P network participant that can:
468/// - Connect to other peers via QUIC transport
469/// - Participate in distributed hash table (DHT) operations
470/// - Send and receive messages through various protocols
471/// - Handle network events and peer lifecycle
472/// - Provide MCP (Model Context Protocol) services
473pub struct P2PNode {
474    /// Node configuration
475    config: NodeConfig,
476
477    /// Our peer ID
478    peer_id: PeerId,
479
480    /// Connected peers
481    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
482
483    /// Network event broadcaster
484    event_tx: broadcast::Sender<P2PEvent>,
485
486    /// Listen addresses
487    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
488
489    /// Node start time
490    start_time: Instant,
491
492    /// Running state
493    running: RwLock<bool>,
494
495    /// DHT instance (optional)
496    dht: Option<Arc<RwLock<DHT>>>,
497
498    /// Production resource manager (optional)
499    resource_manager: Option<Arc<ResourceManager>>,
500
501    /// Bootstrap cache manager for peer discovery
502    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
503
504    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
505    dual_node: Arc<DualStackNetworkNode>,
506
507    /// Rate limiter for connection and request throttling
508    #[allow(dead_code)]
509    rate_limiter: Arc<RateLimiter>,
510
511    /// Active connections (tracked by peer_id)
512    /// This set is synchronized with ant-quic's connection state via event monitoring
513    active_connections: Arc<RwLock<HashSet<PeerId>>>,
514
515    /// Security dashboard for monitoring
516    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
517
518    /// Connection lifecycle monitor task handle
519    #[allow(dead_code)]
520    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
521
522    /// Keepalive task handle
523    #[allow(dead_code)]
524    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
525
526    /// Shutdown flag for background tasks
527    #[allow(dead_code)]
528    shutdown: Arc<AtomicBool>,
529
530    /// GeoIP provider for connection validation
531    #[allow(dead_code)]
532    geo_provider: Arc<BgpGeoProvider>,
533
534    /// This node's entangled identity (derived from public key + binary hash + nonce).
535    /// Used for software attestation verification during peer handshake.
536    entangled_id: Option<crate::attestation::EntangledId>,
537
538    /// BLAKE3 hash of the running binary for attestation.
539    /// In production, this is computed at startup from the executable.
540    binary_hash: [u8; 32],
541}
542
543/// Normalize wildcard bind addresses to localhost loopback addresses
544///
545/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
546/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
547///
548/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
549/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
550/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
551/// - All other addresses pass through unchanged
552///
553/// # Arguments
554/// * `addr` - The SocketAddr to normalize
555///
556/// # Returns
557/// * Normalized SocketAddr suitable for remote connections
558fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
559    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
560
561    if addr.ip().is_unspecified() {
562        // Convert unspecified addresses to loopback
563        let loopback_ip = match addr {
564            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
565            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
566        };
567        std::net::SocketAddr::new(loopback_ip, addr.port())
568    } else {
569        // Not a wildcard address, pass through unchanged
570        addr
571    }
572}
573
574impl P2PNode {
575    /// Minimal constructor for tests that avoids real networking
576    pub fn new_for_tests() -> Result<Self> {
577        let (event_tx, _) = broadcast::channel(16);
578        Ok(Self {
579            config: NodeConfig::default(),
580            peer_id: "test_peer".to_string(),
581            peers: Arc::new(RwLock::new(HashMap::new())),
582            event_tx,
583            listen_addrs: RwLock::new(Vec::new()),
584            start_time: Instant::now(),
585            running: RwLock::new(false),
586            dht: None,
587            resource_manager: None,
588            bootstrap_manager: None,
589            dual_node: {
590                // Bind dual-stack nodes on ephemeral ports for tests
591                let v6: Option<std::net::SocketAddr> = "[::1]:0"
592                    .parse()
593                    .ok()
594                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
595                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
596                let handle = tokio::runtime::Handle::current();
597                let dual_attempt = handle.block_on(
598                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
599                );
600                let dual = match dual_attempt {
601                    Ok(d) => d,
602                    Err(_e1) => {
603                        // Fallback to IPv4-only ephemeral bind
604                        let fallback = handle.block_on(
605                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
606                                None,
607                                "127.0.0.1:0".parse().ok(),
608                            ),
609                        );
610                        match fallback {
611                            Ok(d) => d,
612                            Err(e2) => {
613                                return Err(P2PError::Network(NetworkError::BindError(
614                                    format!("Failed to create dual-stack network node: {}", e2)
615                                        .into(),
616                                )));
617                            }
618                        }
619                    }
620                };
621                Arc::new(dual)
622            },
623            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
624                max_requests: 100,
625                burst_size: 100,
626                window: std::time::Duration::from_secs(1),
627                ..Default::default()
628            })),
629            active_connections: Arc::new(RwLock::new(HashSet::new())),
630            connection_monitor_handle: Arc::new(RwLock::new(None)),
631            keepalive_handle: Arc::new(RwLock::new(None)),
632            shutdown: Arc::new(AtomicBool::new(false)),
633            geo_provider: Arc::new(BgpGeoProvider::new()),
634            security_dashboard: None,
635            // Attestation fields - use dummy values for tests
636            entangled_id: None,
637            binary_hash: [0u8; 32],
638        })
639    }
640    /// Create a new P2P node with the given configuration
641    pub async fn new(config: NodeConfig) -> Result<Self> {
642        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
643            // Generate a random peer ID for now
644            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
645        });
646
647        let (event_tx, _) = broadcast::channel(1000);
648
649        // Initialize and register a TrustWeightedKademlia DHT for the global API
650        // Use a deterministic local NodeId derived from the peer_id
651        {
652            use blake3::Hasher;
653            let mut hasher = Hasher::new();
654            hasher.update(peer_id.as_bytes());
655            let digest = hasher.finalize();
656            let mut nid = [0u8; 32];
657            nid.copy_from_slice(digest.as_bytes());
658            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
659                crate::identity::node_identity::NodeId::from_bytes(nid),
660            ));
661            // TODO: Update to use new clean API
662            // let _ = crate::api::set_dht_instance(twdht);
663        }
664
665        // Initialize DHT if needed
666        let (dht, security_dashboard) = if true {
667            // Assuming DHT is always enabled for now, or check config
668            let _dht_config = crate::dht::DHTConfig {
669                replication_factor: config.dht_config.k_value,
670                bucket_size: config.dht_config.k_value,
671                alpha: config.dht_config.alpha_value,
672                record_ttl: config.dht_config.record_ttl,
673                bucket_refresh_interval: config.dht_config.refresh_interval,
674                republish_interval: config.dht_config.refresh_interval,
675                max_distance: 160,
676            };
677            // Convert peer_id String to NodeId
678            let peer_bytes = peer_id.as_bytes();
679            let mut node_id_bytes = [0u8; 32];
680            let len = peer_bytes.len().min(32);
681            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
682            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
683            let dht_instance = DHT::new(node_id).map_err(|e| {
684                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
685                    e.to_string().into(),
686                ))
687            })?;
688            dht_instance.start_maintenance_tasks();
689
690            // Create Security Dashboard
691            let security_metrics = dht_instance.security_metrics();
692            let dashboard = crate::dht::metrics::SecurityDashboard::new(
693                security_metrics,
694                Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
695                Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
696                Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
697            );
698
699            (
700                Some(Arc::new(RwLock::new(dht_instance))),
701                Some(Arc::new(dashboard)),
702            )
703        } else {
704            (None, None)
705        };
706
707        // MCP removed
708
709        // Initialize production resource manager if configured
710        let resource_manager = config
711            .production_config
712            .clone()
713            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
714
715        // Initialize bootstrap cache manager
716        let diversity_config = config.diversity_config.clone().unwrap_or_default();
717        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
718            match BootstrapManager::with_full_config(
719                cache_config.clone(),
720                crate::rate_limit::JoinRateLimiterConfig::default(),
721                diversity_config.clone(),
722            )
723            .await
724            {
725                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
726                Err(e) => {
727                    warn!(
728                        "Failed to initialize bootstrap manager: {}, continuing without cache",
729                        e
730                    );
731                    None
732                }
733            }
734        } else {
735            match BootstrapManager::with_full_config(
736                crate::bootstrap::CacheConfig::default(),
737                crate::rate_limit::JoinRateLimiterConfig::default(),
738                diversity_config,
739            )
740            .await
741            {
742                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
743                Err(e) => {
744                    warn!(
745                        "Failed to initialize bootstrap manager: {}, continuing without cache",
746                        e
747                    );
748                    None
749                }
750            }
751        };
752
753        // Initialize dual-stack ant-quic nodes
754        // Determine bind addresses
755        let (v6_opt, v4_opt) = {
756            let port = config.listen_addr.port();
757            let ip = config.listen_addr.ip();
758
759            let v4_addr = if ip.is_ipv4() {
760                Some(std::net::SocketAddr::new(ip, port))
761            } else {
762                // If config is IPv6, we still might want IPv4 on UNSPECIFIED if dual stack is desired
763                // But for now let's just stick to defaults if not specified
764                Some(std::net::SocketAddr::new(
765                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
766                    port,
767                ))
768            };
769
770            let v6_addr = if config.enable_ipv6 {
771                if ip.is_ipv6() {
772                    Some(std::net::SocketAddr::new(ip, port))
773                } else {
774                    Some(std::net::SocketAddr::new(
775                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
776                        port,
777                    ))
778                }
779            } else {
780                None
781            };
782            (v6_addr, v4_addr)
783        };
784
785        let dual_node = Arc::new(
786            DualStackNetworkNode::new(v6_opt, v4_opt)
787                .await
788                .map_err(|e| {
789                    P2PError::Transport(crate::error::TransportError::SetupFailed(
790                        format!("Failed to create dual-stack network nodes: {}", e).into(),
791                    ))
792                })?,
793        );
794
795        // Initialize rate limiter with default config
796        let rate_limiter = Arc::new(RateLimiter::new(
797            crate::validation::RateLimitConfig::default(),
798        ));
799
800        // Create active connections tracker
801        let active_connections = Arc::new(RwLock::new(HashSet::new()));
802
803        // Initialize GeoIP provider
804        let geo_provider = Arc::new(BgpGeoProvider::new());
805
806        // Create peers map
807        let peers = Arc::new(RwLock::new(HashMap::new()));
808
809        // Start connection lifecycle monitor
810        let connection_monitor_handle = {
811            let active_conns = Arc::clone(&active_connections);
812            let peers_map = Arc::clone(&peers);
813            let event_tx_clone = event_tx.clone();
814            let dual_node_clone = Arc::clone(&dual_node);
815            let geo_provider_clone = Arc::clone(&geo_provider);
816            let peer_id_clone = peer_id.clone();
817
818            let handle = tokio::spawn(async move {
819                Self::connection_lifecycle_monitor(
820                    dual_node_clone,
821                    active_conns,
822                    peers_map,
823                    event_tx_clone,
824                    geo_provider_clone,
825                    peer_id_clone,
826                )
827                .await;
828            });
829
830            Arc::new(RwLock::new(Some(handle)))
831        };
832
833        // Spawn keepalive task
834        let shutdown = Arc::new(AtomicBool::new(false));
835        let keepalive_handle = {
836            let active_conns = Arc::clone(&active_connections);
837            let dual_node_clone = Arc::clone(&dual_node);
838            let shutdown_clone = Arc::clone(&shutdown);
839
840            let handle = tokio::spawn(async move {
841                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
842            });
843
844            Arc::new(RwLock::new(Some(handle)))
845        };
846
847        // Compute binary hash for attestation (in production, this would be the actual binary)
848        // For now, we use a placeholder that will be replaced during node initialization
849        let binary_hash = Self::compute_binary_hash();
850
851        let node = Self {
852            config,
853            peer_id,
854            peers,
855            event_tx,
856            listen_addrs: RwLock::new(Vec::new()),
857            start_time: Instant::now(),
858            running: RwLock::new(false),
859            dht,
860            resource_manager,
861            bootstrap_manager,
862            dual_node,
863            rate_limiter,
864            active_connections,
865            security_dashboard,
866            connection_monitor_handle,
867            keepalive_handle,
868            shutdown,
869            geo_provider,
870            // Attestation - EntangledId will be derived later when NodeIdentity is available
871            entangled_id: None,
872            binary_hash,
873        };
874        info!("Created P2P node with peer ID: {}", node.peer_id);
875
876        // Start the network listeners to populate listen addresses
877        node.start_network_listeners().await?;
878
879        // Update the connection monitor with actual peers reference
880        node.start_connection_monitor().await;
881
882        Ok(node)
883    }
884
885    /// Create a new node builder
886    pub fn builder() -> NodeBuilder {
887        NodeBuilder::new()
888    }
889
890    /// Get the peer ID of this node
891    pub fn peer_id(&self) -> &PeerId {
892        &self.peer_id
893    }
894
895    pub fn local_addr(&self) -> Option<String> {
896        self.listen_addrs
897            .try_read()
898            .ok()
899            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
900    }
901
902    pub async fn subscribe(&self, topic: &str) -> Result<()> {
903        // In a real implementation, this would register the topic with the pubsub mechanism.
904        // For now, we just log it.
905        info!("Subscribed to topic: {}", topic);
906        Ok(())
907    }
908
909    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
910        info!(
911            "Publishing message to topic: {} ({} bytes)",
912            topic,
913            data.len()
914        );
915
916        // Get list of connected peers
917        let peer_list: Vec<PeerId> = {
918            let peers_guard = self.peers.read().await;
919            peers_guard.keys().cloned().collect()
920        };
921
922        if peer_list.is_empty() {
923            debug!("No peers connected, message will only be sent to local subscribers");
924        } else {
925            // Send message to all connected peers
926            let mut send_count = 0;
927            for peer_id in &peer_list {
928                match self.send_message(peer_id, topic, data.to_vec()).await {
929                    Ok(_) => {
930                        send_count += 1;
931                        debug!("Sent message to peer: {}", peer_id);
932                    }
933                    Err(e) => {
934                        warn!("Failed to send message to peer {}: {}", peer_id, e);
935                    }
936                }
937            }
938            info!(
939                "Published message to {}/{} connected peers",
940                send_count,
941                peer_list.len()
942            );
943        }
944
945        // Also send to local subscribers (for local echo and testing)
946        let event = P2PEvent::Message {
947            topic: topic.to_string(),
948            source: self.peer_id.clone(),
949            data: data.to_vec(),
950        };
951        let _ = self.event_tx.send(event);
952
953        Ok(())
954    }
955
956    /// Get the node configuration
957    pub fn config(&self) -> &NodeConfig {
958        &self.config
959    }
960
961    /// Start the P2P node
962    pub async fn start(&self) -> Result<()> {
963        info!("Starting P2P node...");
964
965        // Start production resource manager if configured
966        if let Some(ref resource_manager) = self.resource_manager {
967            resource_manager.start().await.map_err(|e| {
968                P2PError::Network(crate::error::NetworkError::ProtocolError(
969                    format!("Failed to start resource manager: {e}").into(),
970                ))
971            })?;
972            info!("Production resource manager started");
973        }
974
975        // Start bootstrap manager background tasks
976        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
977            let mut manager = bootstrap_manager.write().await;
978            manager.start_background_tasks().await.map_err(|e| {
979                P2PError::Network(crate::error::NetworkError::ProtocolError(
980                    format!("Failed to start bootstrap manager: {e}").into(),
981                ))
982            })?;
983            info!("Bootstrap cache manager started");
984        }
985
986        // Set running state
987        *self.running.write().await = true;
988
989        // Start listening on configured addresses using transport layer
990        self.start_network_listeners().await?;
991
992        // Log current listen addresses
993        let listen_addrs = self.listen_addrs.read().await;
994        info!("P2P node started on addresses: {:?}", *listen_addrs);
995
996        // MCP removed
997
998        // Start message receiving system
999        self.start_message_receiving_system().await?;
1000
1001        // Connect to bootstrap peers
1002        self.connect_bootstrap_peers().await?;
1003
1004        Ok(())
1005    }
1006
1007    /// Start network listeners on configured addresses
1008    async fn start_network_listeners(&self) -> Result<()> {
1009        info!("Starting dual-stack listeners (ant-quic)...");
1010        // Update our listen_addrs from the dual node bindings
1011        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1012            P2PError::Transport(crate::error::TransportError::SetupFailed(
1013                format!("Failed to get local addresses: {}", e).into(),
1014            ))
1015        })?;
1016        {
1017            let mut la = self.listen_addrs.write().await;
1018            *la = addrs.clone();
1019        }
1020
1021        // Spawn a background accept loop that handles incoming connections from either stack
1022        let event_tx = self.event_tx.clone();
1023        let peers = self.peers.clone();
1024        let active_connections = self.active_connections.clone();
1025        let rate_limiter = self.rate_limiter.clone();
1026        let dual = self.dual_node.clone();
1027        tokio::spawn(async move {
1028            loop {
1029                match dual.accept_any().await {
1030                    Ok((ant_peer_id, remote_sock)) => {
1031                        let peer_id =
1032                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1033                        let remote_addr = NetworkAddress::from(remote_sock);
1034                        // Optional: basic IP rate limiting
1035                        let _ = rate_limiter.check_ip(&remote_sock.ip());
1036                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1037                        register_new_peer(&peers, &peer_id, &remote_addr).await;
1038                        active_connections.write().await.insert(peer_id);
1039                    }
1040                    Err(e) => {
1041                        warn!("Accept failed: {}", e);
1042                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1043                    }
1044                }
1045            }
1046        });
1047
1048        info!("Dual-stack listeners active on: {:?}", addrs);
1049        Ok(())
1050    }
1051
1052    /// Start a listener on a specific socket address
1053    #[allow(dead_code)]
1054    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1055        // use crate::transport::{Transport}; // Unused during migration
1056
1057        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
1058        /*
1059        // Try QUIC first (preferred transport)
1060        match crate::transport::QuicTransport::new(Default::default()) {
1061            Ok(quic_transport) => {
1062                match quic_transport.listen(NetworkAddress::new(addr)).await {
1063                    Ok(listen_addrs) => {
1064                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
1065
1066                        // Store the actual listening addresses in the node
1067                        {
1068                            let mut node_listen_addrs = self.listen_addrs.write().await;
1069                            // Don't clear - accumulate addresses from multiple listeners
1070                            node_listen_addrs.push(listen_addrs.socket_addr());
1071                        }
1072
1073                        // Start accepting connections in background
1074                        self.start_connection_acceptor(
1075                            Arc::new(quic_transport),
1076                            addr,
1077                            crate::transport::TransportType::QUIC
1078                        ).await?;
1079
1080                        return Ok(());
1081                    }
1082                    Err(e) => {
1083                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
1084                    }
1085                }
1086            }
1087            Err(e) => {
1088                warn!("Failed to create QUIC transport for listening: {}", e);
1089            }
1090        }
1091        */
1092
1093        warn!("QUIC transport temporarily disabled during ant-quic migration");
1094        // No TCP fallback - QUIC only
1095        Err(crate::P2PError::Transport(
1096            crate::error::TransportError::SetupFailed(
1097                format!(
1098                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
1099                )
1100                .into(),
1101            ),
1102        ))
1103    }
1104
1105    /// Start connection acceptor background task
1106    #[allow(dead_code)] // Deprecated during ant-quic migration
1107    async fn start_connection_acceptor(
1108        &self,
1109        transport: Arc<dyn crate::transport::Transport>,
1110        addr: std::net::SocketAddr,
1111        transport_type: crate::transport::TransportType,
1112    ) -> Result<()> {
1113        info!(
1114            "Starting connection acceptor for {:?} on {}",
1115            transport_type, addr
1116        );
1117
1118        // Clone necessary data for the background task
1119        let event_tx = self.event_tx.clone();
1120        let _peer_id = self.peer_id.clone();
1121        let peers = Arc::clone(&self.peers);
1122        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
1123
1124        let rate_limiter = Arc::clone(&self.rate_limiter);
1125
1126        // Spawn background task to accept incoming connections
1127        tokio::spawn(async move {
1128            loop {
1129                match transport.accept().await {
1130                    Ok(connection) => {
1131                        let remote_addr = connection.remote_addr();
1132                        let connection_peer_id =
1133                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1134
1135                        // Apply rate limiting for incoming connections
1136                        let socket_addr = remote_addr.socket_addr();
1137                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1138                            // Connection dropped automatically when it goes out of scope
1139                            continue;
1140                        }
1141
1142                        info!(
1143                            "Accepted {:?} connection from {} (peer: {})",
1144                            transport_type, remote_addr, connection_peer_id
1145                        );
1146
1147                        // Generate peer connected event
1148                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1149
1150                        // Store the peer connection
1151                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1152
1153                        // Spawn task to handle this specific connection's messages
1154                        spawn_connection_handler(
1155                            connection,
1156                            connection_peer_id,
1157                            event_tx.clone(),
1158                            Arc::clone(&peers),
1159                        );
1160                    }
1161                    Err(e) => {
1162                        warn!(
1163                            "Failed to accept {:?} connection on {}: {}",
1164                            transport_type, addr, e
1165                        );
1166
1167                        // Brief pause before retrying to avoid busy loop
1168                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1169                    }
1170                }
1171            }
1172        });
1173
1174        info!(
1175            "Connection acceptor background task started for {:?} on {}",
1176            transport_type, addr
1177        );
1178        Ok(())
1179    }
1180
1181    /// Start the message receiving system with background tasks
1182    async fn start_message_receiving_system(&self) -> Result<()> {
1183        info!("Starting message receiving system");
1184        let dual = self.dual_node.clone();
1185        let event_tx = self.event_tx.clone();
1186
1187        tokio::spawn(async move {
1188            loop {
1189                match dual.receive_any().await {
1190                    Ok((_peer_id, bytes)) => {
1191                        // Expect the JSON message wrapper from create_protocol_message
1192                        #[allow(clippy::collapsible_if)]
1193                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1194                            if let (Some(protocol), Some(data), Some(from)) = (
1195                                value.get("protocol").and_then(|v| v.as_str()),
1196                                value.get("data").and_then(|v| v.as_array()),
1197                                value.get("from").and_then(|v| v.as_str()),
1198                            ) {
1199                                let payload: Vec<u8> = data
1200                                    .iter()
1201                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1202                                    .collect();
1203                                let _ = event_tx.send(P2PEvent::Message {
1204                                    topic: protocol.to_string(),
1205                                    source: from.to_string(),
1206                                    data: payload,
1207                                });
1208                            }
1209                        }
1210                    }
1211                    Err(e) => {
1212                        warn!("Receive error: {}", e);
1213                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1214                    }
1215                }
1216            }
1217        });
1218
1219        Ok(())
1220    }
1221
1222    /// Handle a received message and generate appropriate events
1223    #[allow(dead_code)]
1224    async fn handle_received_message(
1225        &self,
1226        message_data: Vec<u8>,
1227        peer_id: &PeerId,
1228        _protocol: &str,
1229        event_tx: &broadcast::Sender<P2PEvent>,
1230    ) -> Result<()> {
1231        // MCP removed: no special protocol handling
1232
1233        // Parse the message format we created in create_protocol_message
1234        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1235            Ok(message) => {
1236                if let (Some(protocol), Some(data), Some(from)) = (
1237                    message.get("protocol").and_then(|v| v.as_str()),
1238                    message.get("data").and_then(|v| v.as_array()),
1239                    message.get("from").and_then(|v| v.as_str()),
1240                ) {
1241                    // Convert data array back to bytes
1242                    let data_bytes: Vec<u8> = data
1243                        .iter()
1244                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1245                        .collect();
1246
1247                    // Generate message event
1248                    let event = P2PEvent::Message {
1249                        topic: protocol.to_string(),
1250                        source: from.to_string(),
1251                        data: data_bytes,
1252                    };
1253
1254                    let _ = event_tx.send(event);
1255                    debug!("Generated message event from peer: {}", peer_id);
1256                }
1257            }
1258            Err(e) => {
1259                warn!("Failed to parse received message from {}: {}", peer_id, e);
1260            }
1261        }
1262
1263        Ok(())
1264    }
1265
1266    // MCP removed
1267
1268    // MCP removed
1269
1270    /// Run the P2P node (blocks until shutdown)
1271    pub async fn run(&self) -> Result<()> {
1272        if !*self.running.read().await {
1273            self.start().await?;
1274        }
1275
1276        info!("P2P node running...");
1277
1278        // Main event loop
1279        loop {
1280            if !*self.running.read().await {
1281                break;
1282            }
1283
1284            // Perform periodic tasks
1285            self.periodic_tasks().await?;
1286
1287            // Sleep for a short interval
1288            tokio::time::sleep(Duration::from_millis(100)).await;
1289        }
1290
1291        info!("P2P node stopped");
1292        Ok(())
1293    }
1294
1295    /// Stop the P2P node
1296    pub async fn stop(&self) -> Result<()> {
1297        info!("Stopping P2P node...");
1298
1299        // Set running state to false
1300        *self.running.write().await = false;
1301
1302        // Disconnect all peers
1303        self.disconnect_all_peers().await?;
1304
1305        // Shutdown production resource manager if configured
1306        if let Some(ref resource_manager) = self.resource_manager {
1307            resource_manager.shutdown().await.map_err(|e| {
1308                P2PError::Network(crate::error::NetworkError::ProtocolError(
1309                    format!("Failed to shutdown resource manager: {e}").into(),
1310                ))
1311            })?;
1312            info!("Production resource manager stopped");
1313        }
1314
1315        info!("P2P node stopped");
1316        Ok(())
1317    }
1318
1319    /// Graceful shutdown alias for tests
1320    pub async fn shutdown(&self) -> Result<()> {
1321        self.stop().await
1322    }
1323
1324    /// Check if the node is running
1325    pub async fn is_running(&self) -> bool {
1326        *self.running.read().await
1327    }
1328
1329    /// Get the current listen addresses
1330    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1331        self.listen_addrs.read().await.clone()
1332    }
1333
1334    /// Get connected peers
1335    pub async fn connected_peers(&self) -> Vec<PeerId> {
1336        // "Connected" is defined as currently active at the transport layer.
1337        // The peers map may contain historical peers with Disconnected/Failed status.
1338        self.active_connections
1339            .read()
1340            .await
1341            .iter()
1342            .cloned()
1343            .collect()
1344    }
1345
1346    /// Get peer count
1347    pub async fn peer_count(&self) -> usize {
1348        self.active_connections.read().await.len()
1349    }
1350
1351    /// Get peer info
1352    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1353        self.peers.read().await.get(peer_id).cloned()
1354    }
1355
1356    /// Get the peer ID for a given socket address, if connected
1357    ///
1358    /// This method searches through all connected peers to find one that has
1359    /// the specified address in its address list.
1360    ///
1361    /// # Arguments
1362    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1363    ///
1364    /// # Returns
1365    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1366    /// * `None` - If no peer with this address is currently connected
1367    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1368        // Parse the address to a SocketAddr for comparison
1369        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1370
1371        let peers = self.peers.read().await;
1372
1373        // Search through all connected peers
1374        for (peer_id, peer_info) in peers.iter() {
1375            // Check if this peer has a matching address
1376            for peer_addr in &peer_info.addresses {
1377                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1378                    && peer_socket == socket_addr
1379                {
1380                    return Some(peer_id.clone());
1381                }
1382            }
1383        }
1384
1385        None
1386    }
1387
1388    /// List all active connections with their peer IDs and addresses
1389    ///
1390    /// # Returns
1391    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1392    /// contains all known addresses for that peer.
1393    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1394        let active = self.active_connections.read().await;
1395        let peers = self.peers.read().await;
1396
1397        active
1398            .iter()
1399            .map(|peer_id| {
1400                let addresses = peers
1401                    .get(peer_id)
1402                    .map(|info| info.addresses.clone())
1403                    .unwrap_or_default();
1404                (peer_id.clone(), addresses)
1405            })
1406            .collect()
1407    }
1408
1409    /// Remove a peer from the peers map
1410    ///
1411    /// This method removes a peer from the internal peers map. It should be used
1412    /// when a connection is no longer valid (e.g., after detecting that the underlying
1413    /// ant-quic connection has closed).
1414    ///
1415    /// # Arguments
1416    /// * `peer_id` - The ID of the peer to remove
1417    ///
1418    /// # Returns
1419    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1420    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1421        // Remove from active connections tracking
1422        self.active_connections.write().await.remove(peer_id);
1423        // Remove from peers map and return whether it existed
1424        self.peers.write().await.remove(peer_id).is_some()
1425    }
1426
1427    /// Check if a peer is connected
1428    ///
1429    /// This method checks if the peer ID exists in the peers map. Note that this
1430    /// only verifies the peer is registered - it does not guarantee the underlying
1431    /// ant-quic connection is still active. For connection validation, use `send_message`
1432    /// which will fail if the connection is closed.
1433    ///
1434    /// # Arguments
1435    /// * `peer_id` - The ID of the peer to check
1436    ///
1437    /// # Returns
1438    /// `true` if the peer exists in the peers map, `false` otherwise
1439    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1440        self.peers.read().await.contains_key(peer_id)
1441    }
1442
1443    /// Connect to a peer
1444    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1445        info!("Connecting to peer at: {}", address);
1446
1447        // Check production limits if resource manager is enabled
1448        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1449            Some(resource_manager.acquire_connection().await?)
1450        } else {
1451            None
1452        };
1453
1454        // Parse the address to SocketAddr format
1455        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1456            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1457                format!("{}: {}", address, e).into(),
1458            ))
1459        })?;
1460
1461        // Normalize wildcard addresses to loopback for local connections
1462        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1463        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1464        if normalized_addr != socket_addr {
1465            info!(
1466                "Normalized wildcard address {} to loopback {}",
1467                socket_addr, normalized_addr
1468            );
1469        }
1470
1471        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1472        let addr_list = vec![normalized_addr];
1473        let peer_id = match tokio::time::timeout(
1474            self.config.connection_timeout,
1475            self.dual_node.connect_happy_eyeballs(&addr_list),
1476        )
1477        .await
1478        {
1479            Ok(Ok(peer)) => {
1480                let connected_peer_id =
1481                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1482                info!("Successfully connected to peer: {}", connected_peer_id);
1483                connected_peer_id
1484            }
1485            Ok(Err(e)) => {
1486                warn!("Failed to connect to peer at {}: {}", address, e);
1487                return Err(P2PError::Transport(
1488                    crate::error::TransportError::ConnectionFailed {
1489                        addr: normalized_addr,
1490                        reason: e.to_string().into(),
1491                    },
1492                ));
1493            }
1494            Err(_) => {
1495                warn!(
1496                    "Timed out connecting to peer at {} after {:?}",
1497                    address, self.config.connection_timeout
1498                );
1499                return Err(P2PError::Timeout(self.config.connection_timeout));
1500            }
1501        };
1502
1503        // Create peer info with connection details
1504        let peer_info = PeerInfo {
1505            peer_id: peer_id.clone(),
1506            addresses: vec![address.to_string()],
1507            connected_at: Instant::now(),
1508            last_seen: Instant::now(),
1509            status: ConnectionStatus::Connected,
1510            protocols: vec!["p2p-foundation/1.0".to_string()],
1511            heartbeat_count: 0,
1512        };
1513
1514        // Store peer information
1515        self.peers.write().await.insert(peer_id.clone(), peer_info);
1516
1517        // Add to active connections tracking
1518        // This is critical for is_connection_active() to work correctly
1519        self.active_connections
1520            .write()
1521            .await
1522            .insert(peer_id.clone());
1523
1524        // Record bandwidth usage if resource manager is enabled
1525        if let Some(ref resource_manager) = self.resource_manager {
1526            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1527        }
1528
1529        // Emit connection event
1530        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1531
1532        info!("Connected to peer: {}", peer_id);
1533        Ok(peer_id)
1534    }
1535
1536    /// Disconnect from a peer
1537    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1538        info!("Disconnecting from peer: {}", peer_id);
1539
1540        // Remove from active connections
1541        self.active_connections.write().await.remove(peer_id);
1542
1543        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1544            peer_info.status = ConnectionStatus::Disconnected;
1545
1546            // Emit event
1547            let _ = self
1548                .event_tx
1549                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1550
1551            info!("Disconnected from peer: {}", peer_id);
1552        }
1553
1554        Ok(())
1555    }
1556
1557    /// Check if a connection to a peer is active
1558    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1559        self.active_connections.read().await.contains(peer_id)
1560    }
1561
1562    /// Send a message to a peer
1563    pub async fn send_message(
1564        &self,
1565        peer_id: &PeerId,
1566        protocol: &str,
1567        data: Vec<u8>,
1568    ) -> Result<()> {
1569        debug!(
1570            "Sending message to peer {} on protocol {}",
1571            peer_id, protocol
1572        );
1573
1574        // Check rate limits if resource manager is enabled
1575        if let Some(ref resource_manager) = self.resource_manager
1576            && !resource_manager
1577                .check_rate_limit(peer_id, "message")
1578                .await?
1579        {
1580            return Err(P2PError::ResourceExhausted(
1581                format!("Rate limit exceeded for peer {}", peer_id).into(),
1582            ));
1583        }
1584
1585        // Check if peer exists in peers map
1586        if !self.peers.read().await.contains_key(peer_id) {
1587            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1588                peer_id.to_string().into(),
1589            )));
1590        }
1591
1592        // **NEW**: Check if the ant-quic connection is actually active
1593        // This is the critical fix for the connection state synchronization issue
1594        if !self.is_connection_active(peer_id).await {
1595            debug!(
1596                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1597                peer_id
1598            );
1599
1600            // Clean up stale peer entry
1601            self.remove_peer(peer_id).await;
1602
1603            return Err(P2PError::Network(
1604                crate::error::NetworkError::ConnectionClosed {
1605                    peer_id: peer_id.to_string().into(),
1606                },
1607            ));
1608        }
1609
1610        // MCP removed: no special-case protocol validation
1611
1612        // Record bandwidth usage if resource manager is enabled
1613        if let Some(ref resource_manager) = self.resource_manager {
1614            resource_manager.record_bandwidth(data.len() as u64, 0);
1615        }
1616
1617        // Create protocol message wrapper
1618        let _message_data = self.create_protocol_message(protocol, data)?;
1619
1620        // Send via ant-quic dual-node
1621        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1622        tokio::time::timeout(self.config.connection_timeout, send_fut)
1623            .await
1624            .map_err(|_| {
1625                P2PError::Transport(crate::error::TransportError::StreamError(
1626                    "Timed out sending message".into(),
1627                ))
1628            })?
1629            .map_err(|e| {
1630                P2PError::Transport(crate::error::TransportError::StreamError(
1631                    e.to_string().into(),
1632                ))
1633            })
1634    }
1635
1636    /// Create a protocol message wrapper
1637    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1638        use serde_json::json;
1639
1640        let timestamp = std::time::SystemTime::now()
1641            .duration_since(std::time::UNIX_EPOCH)
1642            .map_err(|e| {
1643                P2PError::Network(NetworkError::ProtocolError(
1644                    format!("System time error: {}", e).into(),
1645                ))
1646            })?
1647            .as_secs();
1648
1649        // Create a simple message format for P2P communication
1650        let message = json!({
1651            "protocol": protocol,
1652            "data": data,
1653            "from": self.peer_id,
1654            "timestamp": timestamp
1655        });
1656
1657        serde_json::to_vec(&message).map_err(|e| {
1658            P2PError::Transport(crate::error::TransportError::StreamError(
1659                format!("Failed to serialize message: {e}").into(),
1660            ))
1661        })
1662    }
1663
1664    // Note: async listen_addrs() already exists above for fetching listen addresses
1665}
1666
1667/// Create a protocol message wrapper (static version for background tasks)
1668#[allow(dead_code)]
1669fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1670    use serde_json::json;
1671
1672    let timestamp = std::time::SystemTime::now()
1673        .duration_since(std::time::UNIX_EPOCH)
1674        .map_err(|e| {
1675            P2PError::Network(NetworkError::ProtocolError(
1676                format!("System time error: {}", e).into(),
1677            ))
1678        })?
1679        .as_secs();
1680
1681    // Create a simple message format for P2P communication
1682    let message = json!({
1683        "protocol": protocol,
1684        "data": data,
1685        "timestamp": timestamp
1686    });
1687
1688    serde_json::to_vec(&message).map_err(|e| {
1689        P2PError::Transport(crate::error::TransportError::StreamError(
1690            format!("Failed to serialize message: {e}").into(),
1691        ))
1692    })
1693}
1694
1695impl P2PNode {
1696    /// Subscribe to network events
1697    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1698        self.event_tx.subscribe()
1699    }
1700
1701    /// Backwards-compat event stream accessor for tests
1702    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1703        self.subscribe_events()
1704    }
1705
1706    /// Get node uptime
1707    pub fn uptime(&self) -> Duration {
1708        self.start_time.elapsed()
1709    }
1710
1711    // =========================================================================
1712    // Attestation Methods (Phase 1: Soft Enforcement)
1713    // =========================================================================
1714
1715    /// Compute the BLAKE3 hash of the running binary.
1716    ///
1717    /// In production, this reads the actual executable file and hashes it.
1718    /// Returns a placeholder hash if the binary cannot be read.
1719    fn compute_binary_hash() -> [u8; 32] {
1720        // Try to get the path to the current executable and hash it
1721        if let Some(hash) = std::env::current_exe()
1722            .ok()
1723            .and_then(|exe_path| std::fs::read(&exe_path).ok())
1724            .map(|binary_data| blake3::hash(&binary_data))
1725        {
1726            return *hash.as_bytes();
1727        }
1728        // Fallback: return a deterministic placeholder based on compile-time info
1729        // This allows tests and development to work without actual binary hashing
1730        let placeholder = format!(
1731            "saorsa-core-v{}-{}",
1732            env!("CARGO_PKG_VERSION"),
1733            std::env::consts::ARCH
1734        );
1735        let hash = blake3::hash(placeholder.as_bytes());
1736        *hash.as_bytes()
1737    }
1738
1739    /// Get this node's binary hash used for attestation.
1740    #[must_use]
1741    pub fn binary_hash(&self) -> &[u8; 32] {
1742        &self.binary_hash
1743    }
1744
1745    /// Get this node's entangled identity, if set.
1746    #[must_use]
1747    pub fn entangled_id(&self) -> Option<&crate::attestation::EntangledId> {
1748        self.entangled_id.as_ref()
1749    }
1750
1751    /// Set the entangled identity for this node.
1752    ///
1753    /// This should be called after the node's cryptographic identity is established,
1754    /// typically by deriving from the NodeIdentity's public key.
1755    pub fn set_entangled_id(&mut self, entangled_id: crate::attestation::EntangledId) {
1756        self.entangled_id = Some(entangled_id);
1757    }
1758
1759    /// Verify a peer's attestation and return the enforcement decision.
1760    ///
1761    /// This function implements the Entangled Attestation verification protocol
1762    /// (Phase 6: Hard Enforcement). Based on the configured enforcement mode:
1763    ///
1764    /// - **Off**: Skips verification entirely
1765    /// - **Soft**: Logs warnings but allows connections
1766    /// - **Hard**: Rejects connections with invalid attestations
1767    ///
1768    /// # Arguments
1769    /// * `peer_id` - The peer's identifier for logging
1770    /// * `peer_entangled_id` - The peer's claimed entangled ID
1771    /// * `peer_public_key` - The peer's ML-DSA public key
1772    ///
1773    /// # Returns
1774    /// An [`EnforcementDecision`] indicating whether to allow or reject the connection.
1775    ///
1776    /// # Example
1777    /// ```rust,ignore
1778    /// let decision = node.verify_peer_attestation(peer_id, &entangled_id, &public_key);
1779    /// if decision.should_reject() {
1780    ///     // Send rejection message and close connection
1781    ///     if let Some(rejection) = decision.rejection() {
1782    ///         send_rejection(peer_id, rejection);
1783    ///     }
1784    ///     disconnect(peer_id);
1785    /// }
1786    /// ```
1787    pub fn verify_peer_attestation(
1788        &self,
1789        peer_id: &str,
1790        peer_entangled_id: &crate::attestation::EntangledId,
1791        peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1792    ) -> crate::attestation::EnforcementDecision {
1793        use crate::attestation::{
1794            AttestationRejection, AttestationRejectionReason, EnforcementDecision, EnforcementMode,
1795        };
1796
1797        let config = &self.config.attestation_config;
1798
1799        // Skip verification if attestation is disabled
1800        if !config.enabled {
1801            return EnforcementDecision::Skipped;
1802        }
1803
1804        // Verify the entangled ID derivation
1805        let id_valid = peer_entangled_id.verify(peer_public_key);
1806
1807        // Check binary hash allowlist (if configured)
1808        let binary_hash = *peer_entangled_id.binary_hash();
1809        let binary_allowed = config.is_binary_allowed(&binary_hash);
1810
1811        match config.enforcement_mode {
1812            EnforcementMode::Off => EnforcementDecision::Skipped,
1813
1814            EnforcementMode::Soft => {
1815                // Soft enforcement: log warnings but allow connections
1816                if !id_valid {
1817                    warn!(
1818                        peer = %peer_id,
1819                        binary_hash = %hex::encode(&binary_hash[..8]),
1820                        "Peer attestation verification failed: Invalid entangled ID (soft mode - allowing)"
1821                    );
1822                    return EnforcementDecision::AllowWithWarning {
1823                        reason: AttestationRejectionReason::IdentityMismatch,
1824                    };
1825                }
1826                if !binary_allowed {
1827                    warn!(
1828                        peer = %peer_id,
1829                        binary_hash = %hex::encode(binary_hash),
1830                        "Peer attestation verification failed: Binary not in allowlist (soft mode - allowing)"
1831                    );
1832                    return EnforcementDecision::AllowWithWarning {
1833                        reason: AttestationRejectionReason::BinaryNotAllowed { hash: binary_hash },
1834                    };
1835                }
1836                EnforcementDecision::Allow
1837            }
1838
1839            EnforcementMode::Hard => {
1840                // Hard enforcement: reject invalid attestations
1841                if !id_valid {
1842                    error!(
1843                        peer = %peer_id,
1844                        binary_hash = %hex::encode(&binary_hash[..8]),
1845                        "REJECTING peer: Invalid entangled ID derivation"
1846                    );
1847                    return EnforcementDecision::Reject {
1848                        rejection: AttestationRejection::identity_mismatch(),
1849                    };
1850                }
1851                if !binary_allowed {
1852                    error!(
1853                        peer = %peer_id,
1854                        binary_hash = %hex::encode(binary_hash),
1855                        "REJECTING peer: Binary not in allowlist"
1856                    );
1857                    return EnforcementDecision::Reject {
1858                        rejection: AttestationRejection::binary_not_allowed(binary_hash),
1859                    };
1860                }
1861
1862                info!(
1863                    peer = %peer_id,
1864                    entangled_id = %hex::encode(&peer_entangled_id.id()[..8]),
1865                    "Peer attestation verified successfully (hard mode)"
1866                );
1867                EnforcementDecision::Allow
1868            }
1869        }
1870    }
1871
1872    /// Verify a peer's attestation and return a simple boolean result.
1873    ///
1874    /// This is a convenience method that wraps [`verify_peer_attestation`] for cases
1875    /// where only a pass/fail result is needed without the detailed decision.
1876    ///
1877    /// # Returns
1878    /// `true` if the connection should be allowed, `false` if it should be rejected.
1879    #[must_use]
1880    pub fn verify_peer_attestation_simple(
1881        &self,
1882        peer_id: &str,
1883        peer_entangled_id: &crate::attestation::EntangledId,
1884        peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1885    ) -> bool {
1886        self.verify_peer_attestation(peer_id, peer_entangled_id, peer_public_key)
1887            .should_allow()
1888    }
1889
1890    // MCP removed: all MCP tool/service methods removed
1891
1892    // /// Handle MCP remote tool call with network integration
1893
1894    // /// List tools available on a specific remote peer
1895
1896    // /// Get MCP server statistics
1897
1898    /// Get production resource metrics
1899    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1900        if let Some(ref resource_manager) = self.resource_manager {
1901            Ok(resource_manager.get_metrics().await)
1902        } else {
1903            Err(P2PError::Network(
1904                crate::error::NetworkError::ProtocolError(
1905                    "Production resource manager not enabled".to_string().into(),
1906                ),
1907            ))
1908        }
1909    }
1910
1911    /// Connection lifecycle monitor task - processes ant-quic connection events
1912    /// and updates active_connections HashSet and peers map
1913    async fn connection_lifecycle_monitor(
1914        dual_node: Arc<DualStackNetworkNode>,
1915        active_connections: Arc<RwLock<HashSet<String>>>,
1916        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1917        event_tx: broadcast::Sender<P2PEvent>,
1918        geo_provider: Arc<BgpGeoProvider>,
1919        local_peer_id: String,
1920    ) {
1921        use crate::transport::ant_quic_adapter::ConnectionEvent;
1922
1923        let mut event_rx = dual_node.subscribe_connection_events();
1924
1925        info!("Connection lifecycle monitor started");
1926
1927        loop {
1928            match event_rx.recv().await {
1929                Ok(event) => {
1930                    match event {
1931                        ConnectionEvent::Established {
1932                            peer_id,
1933                            remote_address,
1934                        } => {
1935                            let peer_id_str =
1936                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1937                            debug!(
1938                                "Connection established: peer={}, addr={}",
1939                                peer_id_str, remote_address
1940                            );
1941
1942                            // **GeoIP Validation**
1943                            // Check if the peer's IP is allowed
1944                            let ip = remote_address.ip();
1945                            let is_rejected = match ip {
1946                                std::net::IpAddr::V4(v4) => {
1947                                    // Check if it's a hosting provider or VPN
1948                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1949                                        geo_provider.is_hosting_asn(asn)
1950                                            || geo_provider.is_vpn_asn(asn)
1951                                    } else {
1952                                        false
1953                                    }
1954                                }
1955                                std::net::IpAddr::V6(v6) => {
1956                                    let info = geo_provider.lookup(v6);
1957                                    info.is_hosting_provider || info.is_vpn_provider
1958                                }
1959                            };
1960
1961                            if is_rejected {
1962                                info!(
1963                                    "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1964                                    peer_id_str, remote_address
1965                                );
1966
1967                                // Create rejection message
1968                                let rejection = RejectionMessage {
1969                                    reason: RejectionReason::GeoIpPolicy,
1970                                    message:
1971                                        "Connection rejected: Hosting/VPN providers not allowed"
1972                                            .to_string(),
1973                                    suggested_target: None, // Could suggest a different region if we knew more
1974                                };
1975
1976                                // Serialize message
1977                                if let Ok(data) = serde_json::to_vec(&rejection) {
1978                                    // Create protocol message
1979                                    let timestamp = std::time::SystemTime::now()
1980                                        .duration_since(std::time::UNIX_EPOCH)
1981                                        .unwrap_or_default()
1982                                        .as_secs();
1983
1984                                    let message = serde_json::json!({
1985                                        "protocol": "control",
1986                                        "data": data,
1987                                        "from": local_peer_id,
1988                                        "timestamp": timestamp
1989                                    });
1990
1991                                    if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1992                                        // Send rejection message
1993                                        // We use send_to_peer directly on dual_node to avoid the checks in P2PNode::send_message
1994                                        // which might fail if we haven't fully registered the peer yet
1995                                        let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1996
1997                                        // Give it a moment to send before disconnecting?
1998                                        // ant-quic might handle this, but a small yield is safe
1999                                        tokio::task::yield_now().await;
2000                                    }
2001                                }
2002
2003                                // Disconnect (TODO: Add disconnect method to dual_node or just drop?)
2004                                // For now, we just don't add it to active connections, effectively ignoring it
2005                                // Ideally we should actively close the connection
2006                                continue;
2007                            }
2008
2009                            // Add to active connections
2010                            active_connections.write().await.insert(peer_id_str.clone());
2011
2012                            // Update peer info or insert new
2013                            let mut peers_lock = peers.write().await;
2014                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2015                                peer_info.status = ConnectionStatus::Connected;
2016                                peer_info.connected_at = Instant::now();
2017                            } else {
2018                                // New incoming peer
2019                                debug!("Registering new incoming peer: {}", peer_id_str);
2020                                peers_lock.insert(
2021                                    peer_id_str.clone(),
2022                                    PeerInfo {
2023                                        peer_id: peer_id_str.clone(),
2024                                        addresses: vec![remote_address.to_string()],
2025                                        status: ConnectionStatus::Connected,
2026                                        last_seen: Instant::now(),
2027                                        connected_at: Instant::now(),
2028                                        protocols: Vec::new(),
2029                                        heartbeat_count: 0,
2030                                    },
2031                                );
2032                            }
2033
2034                            // Broadcast connection event
2035                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2036                        }
2037                        ConnectionEvent::Lost { peer_id, reason } => {
2038                            let peer_id_str =
2039                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2040                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2041
2042                            // Remove from active connections
2043                            active_connections.write().await.remove(&peer_id_str);
2044
2045                            // Update peer info status
2046                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2047                                peer_info.status = ConnectionStatus::Disconnected;
2048                                peer_info.last_seen = Instant::now();
2049                            }
2050
2051                            // Broadcast disconnection event
2052                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2053                        }
2054                        ConnectionEvent::Failed { peer_id, reason } => {
2055                            let peer_id_str =
2056                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2057                            warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2058
2059                            // Remove from active connections
2060                            active_connections.write().await.remove(&peer_id_str);
2061
2062                            // Update peer info status
2063                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2064                                peer_info.status = ConnectionStatus::Failed(reason.clone());
2065                            }
2066
2067                            // Broadcast disconnection event
2068                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2069                        }
2070                    }
2071                }
2072                Err(broadcast::error::RecvError::Lagged(skipped)) => {
2073                    warn!(
2074                        "Connection event monitor lagged, skipped {} events",
2075                        skipped
2076                    );
2077                    continue;
2078                }
2079                Err(broadcast::error::RecvError::Closed) => {
2080                    info!("Connection event channel closed, stopping monitor");
2081                    break;
2082                }
2083            }
2084        }
2085
2086        info!("Connection lifecycle monitor stopped");
2087    }
2088
2089    /// Start connection monitor (called after node initialization)
2090    async fn start_connection_monitor(&self) {
2091        // The monitor task is already spawned in new() with a temporary peers map
2092        // This method is a placeholder for future enhancements where we might
2093        // need to restart the monitor or provide it with updated references
2094        debug!("Connection monitor already running from initialization");
2095    }
2096
2097    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
2098    ///
2099    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
2100    /// message every 15 seconds (half the timeout) to all active connections to prevent
2101    /// them from timing out during periods of inactivity.
2102    async fn keepalive_task(
2103        active_connections: Arc<RwLock<HashSet<String>>>,
2104        dual_node: Arc<DualStackNetworkNode>,
2105        shutdown: Arc<AtomicBool>,
2106    ) {
2107        use tokio::time::{Duration, interval};
2108
2109        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
2110        const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload
2111
2112        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2113        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2114
2115        info!(
2116            "Keepalive task started (interval: {}s)",
2117            KEEPALIVE_INTERVAL_SECS
2118        );
2119
2120        loop {
2121            // Check shutdown flag first
2122            if shutdown.load(Ordering::Relaxed) {
2123                info!("Keepalive task shutting down");
2124                break;
2125            }
2126
2127            interval.tick().await;
2128
2129            // Get snapshot of active connections
2130            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2131
2132            if peers.is_empty() {
2133                trace!("Keepalive: no active connections");
2134                continue;
2135            }
2136
2137            debug!("Sending keepalive to {} active connections", peers.len());
2138
2139            // Send keepalive to each peer
2140            for peer_id in peers {
2141                match dual_node
2142                    .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
2143                    .await
2144                {
2145                    Ok(_) => {
2146                        trace!("Keepalive sent to peer: {}", peer_id);
2147                    }
2148                    Err(e) => {
2149                        debug!(
2150                            "Failed to send keepalive to peer {}: {} (connection may have closed)",
2151                            peer_id, e
2152                        );
2153                        // Don't remove from active_connections here - let the lifecycle monitor handle it
2154                    }
2155                }
2156            }
2157        }
2158
2159        info!("Keepalive task stopped");
2160    }
2161
2162    /// Check system health
2163    pub async fn health_check(&self) -> Result<()> {
2164        if let Some(ref resource_manager) = self.resource_manager {
2165            resource_manager.health_check().await
2166        } else {
2167            // Basic health check without resource manager
2168            let peer_count = self.peer_count().await;
2169            if peer_count > self.config.max_connections {
2170                Err(P2PError::Network(
2171                    crate::error::NetworkError::ProtocolError(
2172                        format!("Too many connections: {peer_count}").into(),
2173                    ),
2174                ))
2175            } else {
2176                Ok(())
2177            }
2178        }
2179    }
2180
2181    /// Get production configuration (if enabled)
2182    pub fn production_config(&self) -> Option<&ProductionConfig> {
2183        self.config.production_config.as_ref()
2184    }
2185
2186    /// Check if production hardening is enabled
2187    pub fn is_production_mode(&self) -> bool {
2188        self.resource_manager.is_some()
2189    }
2190
2191    /// Get DHT reference
2192    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2193        self.dht.as_ref()
2194    }
2195
2196    /// Store a value in the DHT
2197    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2198        if let Some(ref dht) = self.dht {
2199            let mut dht_instance = dht.write().await;
2200            let dht_key = crate::dht::DhtKey::from_bytes(key);
2201            dht_instance
2202                .store(&dht_key, value.clone())
2203                .await
2204                .map_err(|e| {
2205                    P2PError::Dht(crate::error::DhtError::StoreFailed(
2206                        format!("{:?}: {e}", key).into(),
2207                    ))
2208                })?;
2209
2210            Ok(())
2211        } else {
2212            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2213                "DHT not enabled".to_string().into(),
2214            )))
2215        }
2216    }
2217
2218    /// Retrieve a value from the DHT
2219    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2220        if let Some(ref dht) = self.dht {
2221            let dht_instance = dht.read().await;
2222            let dht_key = crate::dht::DhtKey::from_bytes(key);
2223            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2224                P2PError::Dht(crate::error::DhtError::StoreFailed(
2225                    format!("Retrieve failed: {e}").into(),
2226                ))
2227            })?;
2228
2229            Ok(record_result)
2230        } else {
2231            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2232                "DHT not enabled".to_string().into(),
2233            )))
2234        }
2235    }
2236
2237    /// Add a discovered peer to the bootstrap cache
2238    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2239        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2240            let mut manager = bootstrap_manager.write().await;
2241            let socket_addresses: Vec<std::net::SocketAddr> = addresses
2242                .iter()
2243                .filter_map(|addr| addr.parse().ok())
2244                .collect();
2245            let contact = ContactEntry::new(peer_id, socket_addresses);
2246            manager.add_contact(contact).await.map_err(|e| {
2247                P2PError::Network(crate::error::NetworkError::ProtocolError(
2248                    format!("Failed to add peer to bootstrap cache: {e}").into(),
2249                ))
2250            })?;
2251        }
2252        Ok(())
2253    }
2254
2255    /// Update connection metrics for a peer in the bootstrap cache
2256    pub async fn update_peer_metrics(
2257        &self,
2258        peer_id: &PeerId,
2259        success: bool,
2260        latency_ms: Option<u64>,
2261        _error: Option<String>,
2262    ) -> Result<()> {
2263        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2264            let mut manager = bootstrap_manager.write().await;
2265
2266            // Create quality metrics based on the connection result
2267            let metrics = QualityMetrics {
2268                success_rate: if success { 1.0 } else { 0.0 },
2269                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2270                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
2271                last_connection_attempt: chrono::Utc::now(),
2272                last_successful_connection: if success {
2273                    chrono::Utc::now()
2274                } else {
2275                    chrono::Utc::now() - chrono::Duration::hours(1)
2276                },
2277                uptime_score: 0.5,
2278            };
2279
2280            manager
2281                .update_contact_metrics(peer_id, metrics)
2282                .await
2283                .map_err(|e| {
2284                    P2PError::Network(crate::error::NetworkError::ProtocolError(
2285                        format!("Failed to update peer metrics: {e}").into(),
2286                    ))
2287                })?;
2288        }
2289        Ok(())
2290    }
2291
2292    /// Get bootstrap cache statistics
2293    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2294        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2295            let manager = bootstrap_manager.read().await;
2296            let stats = manager.get_stats().await.map_err(|e| {
2297                P2PError::Network(crate::error::NetworkError::ProtocolError(
2298                    format!("Failed to get bootstrap stats: {e}").into(),
2299                ))
2300            })?;
2301            Ok(Some(stats))
2302        } else {
2303            Ok(None)
2304        }
2305    }
2306
2307    /// Get the number of cached bootstrap peers
2308    pub async fn cached_peer_count(&self) -> usize {
2309        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2310            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2311        {
2312            return stats.total_contacts;
2313        }
2314        0
2315    }
2316
2317    /// Connect to bootstrap peers
2318    async fn connect_bootstrap_peers(&self) -> Result<()> {
2319        let mut bootstrap_contacts = Vec::new();
2320        let mut used_cache = false;
2321        let mut seen_addresses = std::collections::HashSet::new();
2322
2323        // CLI-provided bootstrap peers take priority - always include them first
2324        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2325            self.config.bootstrap_peers_str.clone()
2326        } else {
2327            // Convert Multiaddr to strings
2328            self.config
2329                .bootstrap_peers
2330                .iter()
2331                .map(|addr| addr.to_string())
2332                .collect::<Vec<_>>()
2333        };
2334
2335        if !cli_bootstrap_peers.is_empty() {
2336            info!(
2337                "Using {} CLI-provided bootstrap peers (priority)",
2338                cli_bootstrap_peers.len()
2339            );
2340            for addr in &cli_bootstrap_peers {
2341                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2342                    seen_addresses.insert(socket_addr);
2343                    let contact = ContactEntry::new(
2344                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2345                        vec![socket_addr],
2346                    );
2347                    bootstrap_contacts.push(contact);
2348                } else {
2349                    warn!("Invalid bootstrap address format: {}", addr);
2350                }
2351            }
2352        }
2353
2354        // Supplement with cached bootstrap peers (after CLI peers)
2355        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2356            let manager = bootstrap_manager.read().await;
2357            match manager.get_bootstrap_peers(20).await {
2358                // Try to get top 20 quality peers
2359                Ok(contacts) => {
2360                    if !contacts.is_empty() {
2361                        let mut added_from_cache = 0;
2362                        for contact in contacts {
2363                            // Only add if we haven't already added this address from CLI
2364                            let new_addresses: Vec<_> = contact
2365                                .addresses
2366                                .iter()
2367                                .filter(|addr| !seen_addresses.contains(addr))
2368                                .copied()
2369                                .collect();
2370
2371                            if !new_addresses.is_empty() {
2372                                for addr in &new_addresses {
2373                                    seen_addresses.insert(*addr);
2374                                }
2375                                let mut contact = contact.clone();
2376                                contact.addresses = new_addresses;
2377                                bootstrap_contacts.push(contact);
2378                                added_from_cache += 1;
2379                            }
2380                        }
2381                        if added_from_cache > 0 {
2382                            info!(
2383                                "Added {} cached bootstrap peers (supplementing CLI peers)",
2384                                added_from_cache
2385                            );
2386                            used_cache = true;
2387                        }
2388                    }
2389                }
2390                Err(e) => {
2391                    warn!("Failed to get cached bootstrap peers: {}", e);
2392                }
2393            }
2394        }
2395
2396        if bootstrap_contacts.is_empty() {
2397            info!("No bootstrap peers configured and no cached peers available");
2398            return Ok(());
2399        }
2400
2401        // Connect to bootstrap peers
2402        let mut successful_connections = 0;
2403        for contact in bootstrap_contacts {
2404            for addr in &contact.addresses {
2405                match self.connect_peer(&addr.to_string()).await {
2406                    Ok(peer_id) => {
2407                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2408                        successful_connections += 1;
2409
2410                        // Update bootstrap cache with successful connection
2411                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2412                            let mut manager = bootstrap_manager.write().await;
2413                            let mut updated_contact = contact.clone();
2414                            updated_contact.peer_id = peer_id.clone();
2415                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2416
2417                            if let Err(e) = manager.add_contact(updated_contact).await {
2418                                warn!("Failed to update bootstrap cache: {}", e);
2419                            }
2420                        }
2421                        break; // Successfully connected, move to next contact
2422                    }
2423                    Err(e) => {
2424                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2425
2426                        // Update bootstrap cache with failed connection
2427                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2428                            let mut manager = bootstrap_manager.write().await;
2429                            let mut updated_contact = contact.clone();
2430                            updated_contact.update_connection_result(
2431                                false,
2432                                None,
2433                                Some(e.to_string()),
2434                            );
2435
2436                            if let Err(e) = manager.add_contact(updated_contact).await {
2437                                warn!("Failed to update bootstrap cache: {}", e);
2438                            }
2439                        }
2440                    }
2441                }
2442            }
2443        }
2444
2445        if successful_connections == 0 {
2446            if !used_cache {
2447                warn!("Failed to connect to any bootstrap peers");
2448            }
2449            // Starting a node should not be gated on immediate bootstrap connectivity.
2450            // Keep running and allow background discovery / retries to populate peers later.
2451            return Ok(());
2452        }
2453        info!(
2454            "Successfully connected to {} bootstrap peers",
2455            successful_connections
2456        );
2457
2458        Ok(())
2459    }
2460
2461    /// Disconnect from all peers
2462    async fn disconnect_all_peers(&self) -> Result<()> {
2463        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2464
2465        for peer_id in peer_ids {
2466            self.disconnect_peer(&peer_id).await?;
2467        }
2468
2469        Ok(())
2470    }
2471
2472    /// Perform periodic maintenance tasks
2473    async fn periodic_tasks(&self) -> Result<()> {
2474        // Update peer last seen timestamps
2475        // Remove stale connections
2476        // Perform DHT maintenance
2477        // This is a placeholder for now
2478
2479        Ok(())
2480    }
2481}
2482
2483/// Network sender trait for sending messages
2484#[async_trait::async_trait]
2485pub trait NetworkSender: Send + Sync {
2486    /// Send a message to a specific peer
2487    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2488
2489    /// Get our local peer ID
2490    fn local_peer_id(&self) -> &PeerId;
2491}
2492
2493/// Lightweight wrapper for P2PNode to implement NetworkSender
2494#[derive(Clone)]
2495pub struct P2PNetworkSender {
2496    peer_id: PeerId,
2497    // Use channels for async communication with the P2P node
2498    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2499}
2500
2501impl P2PNetworkSender {
2502    pub fn new(
2503        peer_id: PeerId,
2504        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2505    ) -> Self {
2506        Self { peer_id, send_tx }
2507    }
2508}
2509
2510/// Implementation of NetworkSender trait for P2PNetworkSender
2511#[async_trait::async_trait]
2512impl NetworkSender for P2PNetworkSender {
2513    /// Send a message to a specific peer via the P2P network
2514    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2515        self.send_tx
2516            .send((peer_id.clone(), protocol.to_string(), data))
2517            .map_err(|_| {
2518                P2PError::Network(crate::error::NetworkError::ProtocolError(
2519                    "Failed to send message via channel".to_string().into(),
2520                ))
2521            })?;
2522        Ok(())
2523    }
2524
2525    /// Get our local peer ID
2526    fn local_peer_id(&self) -> &PeerId {
2527        &self.peer_id
2528    }
2529}
2530
2531/// Builder pattern for creating P2P nodes
2532pub struct NodeBuilder {
2533    config: NodeConfig,
2534}
2535
2536impl Default for NodeBuilder {
2537    fn default() -> Self {
2538        Self::new()
2539    }
2540}
2541
2542impl NodeBuilder {
2543    /// Create a new node builder
2544    pub fn new() -> Self {
2545        Self {
2546            config: NodeConfig::default(),
2547        }
2548    }
2549
2550    /// Set the peer ID
2551    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2552        self.config.peer_id = Some(peer_id);
2553        self
2554    }
2555
2556    /// Add a listen address
2557    pub fn listen_on(mut self, addr: &str) -> Self {
2558        if let Ok(multiaddr) = addr.parse() {
2559            self.config.listen_addrs.push(multiaddr);
2560        }
2561        self
2562    }
2563
2564    /// Add a bootstrap peer
2565    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2566        if let Ok(multiaddr) = addr.parse() {
2567            self.config.bootstrap_peers.push(multiaddr);
2568        }
2569        self.config.bootstrap_peers_str.push(addr.to_string());
2570        self
2571    }
2572
2573    /// Enable IPv6 support
2574    pub fn with_ipv6(mut self, enable: bool) -> Self {
2575        self.config.enable_ipv6 = enable;
2576        self
2577    }
2578
2579    // MCP removed: builder methods deleted
2580
2581    /// Set connection timeout
2582    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2583        self.config.connection_timeout = timeout;
2584        self
2585    }
2586
2587    /// Set maximum connections
2588    pub fn with_max_connections(mut self, max: usize) -> Self {
2589        self.config.max_connections = max;
2590        self
2591    }
2592
2593    /// Enable production mode with default configuration
2594    pub fn with_production_mode(mut self) -> Self {
2595        self.config.production_config = Some(ProductionConfig::default());
2596        self
2597    }
2598
2599    /// Configure production settings
2600    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2601        self.config.production_config = Some(production_config);
2602        self
2603    }
2604
2605    /// Configure IP diversity limits for Sybil protection.
2606    pub fn with_diversity_config(
2607        mut self,
2608        diversity_config: crate::security::IPDiversityConfig,
2609    ) -> Self {
2610        self.config.diversity_config = Some(diversity_config);
2611        self
2612    }
2613
2614    /// Configure DHT settings
2615    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2616        self.config.dht_config = dht_config;
2617        self
2618    }
2619
2620    /// Enable DHT with default configuration
2621    pub fn with_default_dht(mut self) -> Self {
2622        self.config.dht_config = DHTConfig::default();
2623        self
2624    }
2625
2626    /// Build the P2P node
2627    pub async fn build(self) -> Result<P2PNode> {
2628        P2PNode::new(self.config).await
2629    }
2630}
2631
2632#[cfg(test)]
2633#[allow(clippy::unwrap_used, clippy::expect_used)]
2634mod diversity_tests {
2635    use super::*;
2636    use crate::security::IPDiversityConfig;
2637
2638    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2639        let diversity_config = config.diversity_config.clone().unwrap_or_default();
2640        if let Some(ref cache_config) = config.bootstrap_cache_config {
2641            BootstrapManager::with_full_config(
2642                cache_config.clone(),
2643                crate::rate_limit::JoinRateLimiterConfig::default(),
2644                diversity_config,
2645            )
2646            .await
2647            .expect("bootstrap manager")
2648        } else {
2649            BootstrapManager::with_full_config(
2650                crate::bootstrap::CacheConfig::default(),
2651                crate::rate_limit::JoinRateLimiterConfig::default(),
2652                diversity_config,
2653            )
2654            .await
2655            .expect("bootstrap manager")
2656        }
2657    }
2658
2659    #[tokio::test]
2660    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2661        let config = NodeConfig {
2662            diversity_config: Some(IPDiversityConfig::testnet()),
2663            ..Default::default()
2664        };
2665
2666        let manager = build_bootstrap_manager_like_prod(&config).await;
2667        assert!(manager.diversity_config().is_relaxed());
2668        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2669    }
2670}
2671
2672/// Standalone function to handle received messages without borrowing self
2673#[allow(dead_code)] // Deprecated during ant-quic migration
2674async fn handle_received_message_standalone(
2675    message_data: Vec<u8>,
2676    peer_id: &PeerId,
2677    _protocol: &str,
2678    event_tx: &broadcast::Sender<P2PEvent>,
2679) -> Result<()> {
2680    // Parse the message format
2681    match serde_json::from_slice::<serde_json::Value>(&message_data) {
2682        Ok(message) => {
2683            if let (Some(protocol), Some(data), Some(from)) = (
2684                message.get("protocol").and_then(|v| v.as_str()),
2685                message.get("data").and_then(|v| v.as_array()),
2686                message.get("from").and_then(|v| v.as_str()),
2687            ) {
2688                // Convert data array back to bytes
2689                let data_bytes: Vec<u8> = data
2690                    .iter()
2691                    .filter_map(|v| v.as_u64().map(|n| n as u8))
2692                    .collect();
2693
2694                // Generate message event
2695                let event = P2PEvent::Message {
2696                    topic: protocol.to_string(),
2697                    source: from.to_string(),
2698                    data: data_bytes,
2699                };
2700
2701                let _ = event_tx.send(event);
2702                debug!("Generated message event from peer: {}", peer_id);
2703            }
2704        }
2705        Err(e) => {
2706            warn!("Failed to parse received message from {}: {}", peer_id, e);
2707        }
2708    }
2709
2710    Ok(())
2711}
2712
2713// MCP removed: standalone MCP handler deleted
2714
2715/// Helper function to handle protocol message creation
2716#[allow(dead_code)]
2717fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2718    match create_protocol_message_static(protocol, data) {
2719        Ok(msg) => Some(msg),
2720        Err(e) => {
2721            warn!("Failed to create protocol message: {}", e);
2722            None
2723        }
2724    }
2725}
2726
2727/// Helper function to handle message send result
2728#[allow(dead_code)]
2729async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2730    match result {
2731        Ok(_) => {
2732            debug!("Message sent to peer {} via transport layer", peer_id);
2733        }
2734        Err(e) => {
2735            warn!("Failed to send message to peer {}: {}", peer_id, e);
2736        }
2737    }
2738}
2739
2740/// Helper function to check rate limit
2741#[allow(dead_code)] // Deprecated during ant-quic migration
2742fn check_rate_limit(
2743    rate_limiter: &RateLimiter,
2744    socket_addr: &std::net::SocketAddr,
2745    remote_addr: &NetworkAddress,
2746) -> Result<()> {
2747    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2748        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2749        e
2750    })
2751}
2752
2753/// Helper function to register a new peer
2754#[allow(dead_code)] // Deprecated during ant-quic migration
2755async fn register_new_peer(
2756    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2757    peer_id: &PeerId,
2758    remote_addr: &NetworkAddress,
2759) {
2760    let mut peers_guard = peers.write().await;
2761    let peer_info = PeerInfo {
2762        peer_id: peer_id.clone(),
2763        addresses: vec![remote_addr.to_string()],
2764        connected_at: tokio::time::Instant::now(),
2765        last_seen: tokio::time::Instant::now(),
2766        status: ConnectionStatus::Connected,
2767        protocols: vec!["p2p-chat/1.0.0".to_string()],
2768        heartbeat_count: 0,
2769    };
2770    peers_guard.insert(peer_id.clone(), peer_info);
2771}
2772
2773/// Helper function to spawn connection handler
2774#[allow(dead_code)] // Deprecated during ant-quic migration
2775fn spawn_connection_handler(
2776    connection: Box<dyn crate::transport::Connection>,
2777    peer_id: PeerId,
2778    event_tx: broadcast::Sender<P2PEvent>,
2779    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2780) {
2781    tokio::spawn(async move {
2782        handle_peer_connection(connection, peer_id, event_tx, peers).await;
2783    });
2784}
2785
2786/// Helper function to handle peer connection
2787#[allow(dead_code)] // Deprecated during ant-quic migration
2788async fn handle_peer_connection(
2789    mut connection: Box<dyn crate::transport::Connection>,
2790    peer_id: PeerId,
2791    event_tx: broadcast::Sender<P2PEvent>,
2792    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2793) {
2794    loop {
2795        match connection.receive().await {
2796            Ok(message_data) => {
2797                debug!(
2798                    "Received {} bytes from peer: {}",
2799                    message_data.len(),
2800                    peer_id
2801                );
2802
2803                // Handle the received message
2804                if let Err(e) = handle_received_message_standalone(
2805                    message_data,
2806                    &peer_id,
2807                    "unknown", // TODO: Extract protocol from message
2808                    &event_tx,
2809                )
2810                .await
2811                {
2812                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
2813                }
2814            }
2815            Err(e) => {
2816                warn!("Failed to receive message from {}: {}", peer_id, e);
2817
2818                // Check if connection is still alive
2819                if !connection.is_alive().await {
2820                    info!("Connection to {} is dead, removing peer", peer_id);
2821
2822                    // Remove dead peer
2823                    remove_peer(&peers, &peer_id).await;
2824
2825                    // Generate peer disconnected event
2826                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2827
2828                    break; // Exit the message receiving loop
2829                }
2830
2831                // Brief pause before retrying
2832                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2833            }
2834        }
2835    }
2836}
2837
2838/// Helper function to remove a peer
2839#[allow(dead_code)] // Deprecated during ant-quic migration
2840async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2841    let mut peers_guard = peers.write().await;
2842    peers_guard.remove(peer_id);
2843}
2844
2845/// Helper function to update peer heartbeat
2846#[allow(dead_code)]
2847async fn update_peer_heartbeat(
2848    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2849    peer_id: &PeerId,
2850) -> Result<()> {
2851    let mut peers_guard = peers.write().await;
2852    match peers_guard.get_mut(peer_id) {
2853        Some(peer_info) => {
2854            peer_info.last_seen = Instant::now();
2855            peer_info.heartbeat_count += 1;
2856            Ok(())
2857        }
2858        None => {
2859            warn!("Received heartbeat from unknown peer: {}", peer_id);
2860            Err(P2PError::Network(NetworkError::PeerNotFound(
2861                format!("Peer {} not found", peer_id).into(),
2862            )))
2863        }
2864    }
2865}
2866
2867/// Helper function to get resource metrics
2868#[allow(dead_code)]
2869async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2870    if let Some(manager) = resource_manager {
2871        let metrics = manager.get_metrics().await;
2872        (metrics.memory_used, metrics.cpu_usage)
2873    } else {
2874        (0, 0.0)
2875    }
2876}
2877
2878#[cfg(test)]
2879mod tests {
2880    use super::*;
2881    // MCP removed from tests
2882    use std::time::Duration;
2883    use tokio::time::timeout;
2884
2885    // Test tool handler for network tests
2886
2887    // MCP removed
2888
2889    /// Helper function to create a test node configuration
2890    fn create_test_node_config() -> NodeConfig {
2891        NodeConfig {
2892            peer_id: Some("test_peer_123".to_string()),
2893            listen_addrs: vec![
2894                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2895                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2896            ],
2897            listen_addr: std::net::SocketAddr::new(
2898                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2899                0,
2900            ),
2901            bootstrap_peers: vec![],
2902            bootstrap_peers_str: vec![],
2903            enable_ipv6: true,
2904
2905            connection_timeout: Duration::from_secs(2),
2906            keep_alive_interval: Duration::from_secs(30),
2907            max_connections: 100,
2908            max_incoming_connections: 50,
2909            dht_config: DHTConfig::default(),
2910            security_config: SecurityConfig::default(),
2911            production_config: None,
2912            bootstrap_cache_config: None,
2913            diversity_config: None,
2914            attestation_config: crate::attestation::AttestationConfig::default(),
2915        }
2916    }
2917
2918    /// Helper function to create a test tool
2919    // MCP removed: test tool helper deleted
2920
2921    #[tokio::test]
2922    async fn test_node_config_default() {
2923        let config = NodeConfig::default();
2924
2925        assert!(config.peer_id.is_none());
2926        assert_eq!(config.listen_addrs.len(), 2);
2927        assert!(config.enable_ipv6);
2928        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2929        assert_eq!(config.max_incoming_connections, 100);
2930        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2931    }
2932
2933    #[tokio::test]
2934    async fn test_dht_config_default() {
2935        let config = DHTConfig::default();
2936
2937        assert_eq!(config.k_value, 20);
2938        assert_eq!(config.alpha_value, 5);
2939        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2940        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2941    }
2942
2943    #[tokio::test]
2944    async fn test_security_config_default() {
2945        let config = SecurityConfig::default();
2946
2947        assert!(config.enable_noise);
2948        assert!(config.enable_tls);
2949        assert_eq!(config.trust_level, TrustLevel::Basic);
2950    }
2951
2952    #[test]
2953    fn test_trust_level_variants() {
2954        // Test that all trust level variants can be created
2955        let _none = TrustLevel::None;
2956        let _basic = TrustLevel::Basic;
2957        let _full = TrustLevel::Full;
2958
2959        // Test equality
2960        assert_eq!(TrustLevel::None, TrustLevel::None);
2961        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2962        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2963        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2964    }
2965
2966    #[test]
2967    fn test_connection_status_variants() {
2968        let connecting = ConnectionStatus::Connecting;
2969        let connected = ConnectionStatus::Connected;
2970        let disconnecting = ConnectionStatus::Disconnecting;
2971        let disconnected = ConnectionStatus::Disconnected;
2972        let failed = ConnectionStatus::Failed("test error".to_string());
2973
2974        assert_eq!(connecting, ConnectionStatus::Connecting);
2975        assert_eq!(connected, ConnectionStatus::Connected);
2976        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2977        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2978        assert_ne!(connecting, connected);
2979
2980        if let ConnectionStatus::Failed(msg) = failed {
2981            assert_eq!(msg, "test error");
2982        } else {
2983            panic!("Expected Failed status");
2984        }
2985    }
2986
2987    #[tokio::test]
2988    async fn test_node_creation() -> Result<()> {
2989        let config = create_test_node_config();
2990        let node = P2PNode::new(config).await?;
2991
2992        assert_eq!(node.peer_id(), "test_peer_123");
2993        assert!(!node.is_running().await);
2994        assert_eq!(node.peer_count().await, 0);
2995        assert!(node.connected_peers().await.is_empty());
2996
2997        Ok(())
2998    }
2999
3000    #[tokio::test]
3001    async fn test_node_creation_without_peer_id() -> Result<()> {
3002        let mut config = create_test_node_config();
3003        config.peer_id = None;
3004
3005        let node = P2PNode::new(config).await?;
3006
3007        // Should have generated a peer ID
3008        assert!(node.peer_id().starts_with("peer_"));
3009        assert!(!node.is_running().await);
3010
3011        Ok(())
3012    }
3013
3014    #[tokio::test]
3015    async fn test_node_lifecycle() -> Result<()> {
3016        let config = create_test_node_config();
3017        let node = P2PNode::new(config).await?;
3018
3019        // Initially not running
3020        assert!(!node.is_running().await);
3021
3022        // Start the node
3023        node.start().await?;
3024        assert!(node.is_running().await);
3025
3026        // Check listen addresses were set (at least one)
3027        let listen_addrs = node.listen_addrs().await;
3028        assert!(
3029            !listen_addrs.is_empty(),
3030            "Expected at least one listening address"
3031        );
3032
3033        // Stop the node
3034        node.stop().await?;
3035        assert!(!node.is_running().await);
3036
3037        Ok(())
3038    }
3039
3040    #[tokio::test]
3041    async fn test_peer_connection() -> Result<()> {
3042        let config1 = create_test_node_config();
3043        let mut config2 = create_test_node_config();
3044        config2.peer_id = Some("test_peer_456".to_string());
3045
3046        let node1 = P2PNode::new(config1).await?;
3047        let node2 = P2PNode::new(config2).await?;
3048
3049        node1.start().await?;
3050        node2.start().await?;
3051
3052        let node2_addr = node2
3053            .listen_addrs()
3054            .await
3055            .into_iter()
3056            .find(|a| a.ip().is_ipv4())
3057            .ok_or_else(|| {
3058                P2PError::Network(crate::error::NetworkError::InvalidAddress(
3059                    "Node 2 did not expose an IPv4 listen address".into(),
3060                ))
3061            })?;
3062
3063        // Connect to a real peer
3064        let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3065
3066        // Check peer count
3067        assert_eq!(node1.peer_count().await, 1);
3068
3069        // Check connected peers
3070        let connected_peers = node1.connected_peers().await;
3071        assert_eq!(connected_peers.len(), 1);
3072        assert_eq!(connected_peers[0], peer_id);
3073
3074        // Get peer info
3075        let peer_info = node1.peer_info(&peer_id).await;
3076        assert!(peer_info.is_some());
3077        let info = peer_info.expect("Peer info should exist after adding peer");
3078        assert_eq!(info.peer_id, peer_id);
3079        assert_eq!(info.status, ConnectionStatus::Connected);
3080        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3081
3082        // Disconnect from peer
3083        node1.disconnect_peer(&peer_id).await?;
3084        assert_eq!(node1.peer_count().await, 0);
3085
3086        node1.stop().await?;
3087        node2.stop().await?;
3088
3089        Ok(())
3090    }
3091
3092    // TODO(windows): Investigate QUIC connection issues on Windows CI
3093    // This test consistently fails on Windows GitHub Actions runners with
3094    // "All connect attempts failed" even with IPv4-only config, long delays,
3095    // and multiple retry attempts. The underlying ant-quic library may have
3096    // issues on Windows that need investigation.
3097    // See: https://github.com/dirvine/saorsa-core/issues/TBD
3098    #[cfg_attr(target_os = "windows", ignore)]
3099    #[tokio::test]
3100    async fn test_event_subscription() -> Result<()> {
3101        // Configure both nodes to use only IPv4 for reliable cross-platform testing
3102        // This is important because:
3103        // 1. local_addr() returns the first address from listen_addrs
3104        // 2. The default config puts IPv6 first, which may not work on all Windows setups
3105        let ipv4_localhost =
3106            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3107
3108        let mut config1 = create_test_node_config();
3109        config1.listen_addr = ipv4_localhost;
3110        config1.listen_addrs = vec![ipv4_localhost];
3111        config1.enable_ipv6 = false;
3112
3113        let mut config2 = create_test_node_config();
3114        config2.peer_id = Some("test_peer_456".to_string());
3115        config2.listen_addr = ipv4_localhost;
3116        config2.listen_addrs = vec![ipv4_localhost];
3117        config2.enable_ipv6 = false;
3118
3119        let node1 = P2PNode::new(config1).await?;
3120        let node2 = P2PNode::new(config2).await?;
3121
3122        node1.start().await?;
3123        node2.start().await?;
3124
3125        // Wait for nodes to fully bind their listening sockets
3126        // Windows network stack initialization can be significantly slower
3127        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3128
3129        let mut events = node1.subscribe_events();
3130
3131        // Get the actual listening address using local_addr() for reliability
3132        let node2_addr = node2.local_addr().ok_or_else(|| {
3133            P2PError::Network(crate::error::NetworkError::ProtocolError(
3134                "No listening address".to_string().into(),
3135            ))
3136        })?;
3137
3138        // Connect to a peer with retry logic for Windows reliability
3139        // The QUIC library may need additional time to fully initialize
3140        let mut peer_id = None;
3141        for attempt in 0..3 {
3142            if attempt > 0 {
3143                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3144            }
3145            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3146                Ok(Ok(id)) => {
3147                    peer_id = Some(id);
3148                    break;
3149                }
3150                Ok(Err(_)) | Err(_) => continue,
3151            }
3152        }
3153        let peer_id = peer_id.ok_or_else(|| {
3154            P2PError::Network(crate::error::NetworkError::ProtocolError(
3155                "Failed to connect after 3 attempts".to_string().into(),
3156            ))
3157        })?;
3158
3159        // Check for PeerConnected event
3160        let event = timeout(Duration::from_secs(2), events.recv()).await;
3161        assert!(event.is_ok());
3162
3163        let event_result = event
3164            .expect("Should receive event")
3165            .expect("Event should not be error");
3166        match event_result {
3167            P2PEvent::PeerConnected(event_peer_id) => {
3168                assert_eq!(event_peer_id, peer_id);
3169            }
3170            _ => panic!("Expected PeerConnected event"),
3171        }
3172
3173        // Disconnect from peer (this should emit another event)
3174        node1.disconnect_peer(&peer_id).await?;
3175
3176        // Check for PeerDisconnected event
3177        let event = timeout(Duration::from_secs(2), events.recv()).await;
3178        assert!(event.is_ok());
3179
3180        let event_result = event
3181            .expect("Should receive event")
3182            .expect("Event should not be error");
3183        match event_result {
3184            P2PEvent::PeerDisconnected(event_peer_id) => {
3185                assert_eq!(event_peer_id, peer_id);
3186            }
3187            _ => panic!("Expected PeerDisconnected event"),
3188        }
3189
3190        node1.stop().await?;
3191        node2.stop().await?;
3192
3193        Ok(())
3194    }
3195
3196    // TODO(windows): Same QUIC connection issues as test_event_subscription
3197    #[cfg_attr(target_os = "windows", ignore)]
3198    #[tokio::test]
3199    async fn test_message_sending() -> Result<()> {
3200        // Create two nodes
3201        let mut config1 = create_test_node_config();
3202        config1.listen_addr =
3203            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3204        let node1 = P2PNode::new(config1).await?;
3205        node1.start().await?;
3206
3207        let mut config2 = create_test_node_config();
3208        config2.listen_addr =
3209            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3210        let node2 = P2PNode::new(config2).await?;
3211        node2.start().await?;
3212
3213        // Wait a bit for nodes to start listening
3214        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3215
3216        // Get actual listening address of node2
3217        let node2_addr = node2.local_addr().ok_or_else(|| {
3218            P2PError::Network(crate::error::NetworkError::ProtocolError(
3219                "No listening address".to_string().into(),
3220            ))
3221        })?;
3222
3223        // Connect node1 to node2
3224        let peer_id =
3225            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3226                Ok(res) => res?,
3227                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3228            };
3229
3230        // Wait a bit for connection to establish
3231        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3232
3233        // Send a message
3234        let message_data = b"Hello, peer!".to_vec();
3235        let result = match timeout(
3236            Duration::from_millis(500),
3237            node1.send_message(&peer_id, "test-protocol", message_data),
3238        )
3239        .await
3240        {
3241            Ok(res) => res,
3242            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3243        };
3244        // For now, we'll just check that we don't get a "not connected" error
3245        // The actual send might fail due to no handler on the other side
3246        if let Err(e) = &result {
3247            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3248        }
3249
3250        // Try to send to non-existent peer
3251        let non_existent_peer = "non_existent_peer".to_string();
3252        let result = node1
3253            .send_message(&non_existent_peer, "test-protocol", vec![])
3254            .await;
3255        assert!(result.is_err(), "Sending to non-existent peer should fail");
3256
3257        Ok(())
3258    }
3259
3260    #[tokio::test]
3261    async fn test_remote_mcp_operations() -> Result<()> {
3262        let config = create_test_node_config();
3263        let node = P2PNode::new(config).await?;
3264
3265        // MCP removed; test reduced to simple start/stop
3266        node.start().await?;
3267        node.stop().await?;
3268        Ok(())
3269    }
3270
3271    #[tokio::test]
3272    async fn test_health_check() -> Result<()> {
3273        let config = create_test_node_config();
3274        let node = P2PNode::new(config).await?;
3275
3276        // Health check should pass with no connections
3277        let result = node.health_check().await;
3278        assert!(result.is_ok());
3279
3280        // Note: We're not actually connecting to real peers here
3281        // since that would require running bootstrap nodes.
3282        // The health check should still pass with no connections.
3283
3284        Ok(())
3285    }
3286
3287    #[tokio::test]
3288    async fn test_node_uptime() -> Result<()> {
3289        let config = create_test_node_config();
3290        let node = P2PNode::new(config).await?;
3291
3292        let uptime1 = node.uptime();
3293        assert!(uptime1 >= Duration::from_secs(0));
3294
3295        // Wait a bit
3296        tokio::time::sleep(Duration::from_millis(10)).await;
3297
3298        let uptime2 = node.uptime();
3299        assert!(uptime2 > uptime1);
3300
3301        Ok(())
3302    }
3303
3304    #[tokio::test]
3305    async fn test_node_config_access() -> Result<()> {
3306        let config = create_test_node_config();
3307        let expected_peer_id = config.peer_id.clone();
3308        let node = P2PNode::new(config).await?;
3309
3310        let node_config = node.config();
3311        assert_eq!(node_config.peer_id, expected_peer_id);
3312        assert_eq!(node_config.max_connections, 100);
3313        // MCP removed
3314
3315        Ok(())
3316    }
3317
3318    #[tokio::test]
3319    async fn test_mcp_server_access() -> Result<()> {
3320        let config = create_test_node_config();
3321        let _node = P2PNode::new(config).await?;
3322
3323        // MCP removed
3324        Ok(())
3325    }
3326
3327    #[tokio::test]
3328    async fn test_dht_access() -> Result<()> {
3329        let config = create_test_node_config();
3330        let node = P2PNode::new(config).await?;
3331
3332        // Should have DHT
3333        assert!(node.dht().is_some());
3334
3335        Ok(())
3336    }
3337
3338    #[tokio::test]
3339    async fn test_node_builder() -> Result<()> {
3340        // Create a config using the builder but don't actually build a real node
3341        let builder = P2PNode::builder()
3342            .with_peer_id("builder_test_peer".to_string())
3343            .listen_on("/ip4/127.0.0.1/tcp/0")
3344            .listen_on("/ip6/::1/tcp/0")
3345            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
3346            .with_ipv6(true)
3347            .with_connection_timeout(Duration::from_secs(15))
3348            .with_max_connections(200);
3349
3350        // Test the configuration that was built
3351        let config = builder.config;
3352        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3353        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
3354        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
3355        assert!(config.enable_ipv6);
3356        assert_eq!(config.connection_timeout, Duration::from_secs(15));
3357        assert_eq!(config.max_connections, 200);
3358
3359        Ok(())
3360    }
3361
3362    #[tokio::test]
3363    async fn test_bootstrap_peers() -> Result<()> {
3364        let mut config = create_test_node_config();
3365        config.bootstrap_peers = vec![
3366            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3367            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3368        ];
3369
3370        let node = P2PNode::new(config).await?;
3371
3372        // Start node (which attempts to connect to bootstrap peers)
3373        node.start().await?;
3374
3375        // In a test environment, bootstrap peers may not be available
3376        // The test verifies the node starts correctly with bootstrap configuration
3377        // Peer count may include local/internal tracking, so we just verify it's reasonable
3378        let _peer_count = node.peer_count().await;
3379
3380        node.stop().await?;
3381        Ok(())
3382    }
3383
3384    #[tokio::test]
3385    async fn test_production_mode_disabled() -> Result<()> {
3386        let config = create_test_node_config();
3387        let node = P2PNode::new(config).await?;
3388
3389        assert!(!node.is_production_mode());
3390        assert!(node.production_config().is_none());
3391
3392        // Resource metrics should fail when production mode is disabled
3393        let result = node.resource_metrics().await;
3394        assert!(result.is_err());
3395        assert!(result.unwrap_err().to_string().contains("not enabled"));
3396
3397        Ok(())
3398    }
3399
3400    #[tokio::test]
3401    async fn test_network_event_variants() {
3402        // Test that all network event variants can be created
3403        let peer_id = "test_peer".to_string();
3404        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3405
3406        let _peer_connected = NetworkEvent::PeerConnected {
3407            peer_id: peer_id.clone(),
3408            addresses: vec![address.clone()],
3409        };
3410
3411        let _peer_disconnected = NetworkEvent::PeerDisconnected {
3412            peer_id: peer_id.clone(),
3413            reason: "test disconnect".to_string(),
3414        };
3415
3416        let _message_received = NetworkEvent::MessageReceived {
3417            peer_id: peer_id.clone(),
3418            protocol: "test-protocol".to_string(),
3419            data: vec![1, 2, 3],
3420        };
3421
3422        let _connection_failed = NetworkEvent::ConnectionFailed {
3423            peer_id: Some(peer_id.clone()),
3424            address: address.clone(),
3425            error: "connection refused".to_string(),
3426        };
3427
3428        let _dht_stored = NetworkEvent::DHTRecordStored {
3429            key: vec![1, 2, 3],
3430            value: vec![4, 5, 6],
3431        };
3432
3433        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3434            key: vec![1, 2, 3],
3435            value: Some(vec![4, 5, 6]),
3436        };
3437    }
3438
3439    #[tokio::test]
3440    async fn test_peer_info_structure() {
3441        let peer_info = PeerInfo {
3442            peer_id: "test_peer".to_string(),
3443            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3444            connected_at: Instant::now(),
3445            last_seen: Instant::now(),
3446            status: ConnectionStatus::Connected,
3447            protocols: vec!["test-protocol".to_string()],
3448            heartbeat_count: 0,
3449        };
3450
3451        assert_eq!(peer_info.peer_id, "test_peer");
3452        assert_eq!(peer_info.addresses.len(), 1);
3453        assert_eq!(peer_info.status, ConnectionStatus::Connected);
3454        assert_eq!(peer_info.protocols.len(), 1);
3455    }
3456
3457    #[tokio::test]
3458    async fn test_serialization() -> Result<()> {
3459        // Test that configs can be serialized/deserialized
3460        let config = create_test_node_config();
3461        let serialized = serde_json::to_string(&config)?;
3462        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3463
3464        assert_eq!(config.peer_id, deserialized.peer_id);
3465        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3466        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3467
3468        Ok(())
3469    }
3470
3471    #[tokio::test]
3472    async fn test_get_peer_id_by_address_found() -> Result<()> {
3473        let config = create_test_node_config();
3474        let node = P2PNode::new(config).await?;
3475
3476        // Manually insert a peer for testing
3477        let test_peer_id = "peer_test_123".to_string();
3478        let test_address = "192.168.1.100:9000".to_string();
3479
3480        let peer_info = PeerInfo {
3481            peer_id: test_peer_id.clone(),
3482            addresses: vec![test_address.clone()],
3483            connected_at: Instant::now(),
3484            last_seen: Instant::now(),
3485            status: ConnectionStatus::Connected,
3486            protocols: vec!["test-protocol".to_string()],
3487            heartbeat_count: 0,
3488        };
3489
3490        node.peers
3491            .write()
3492            .await
3493            .insert(test_peer_id.clone(), peer_info);
3494
3495        // Test: Find peer by address
3496        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3497        assert_eq!(found_peer_id, Some(test_peer_id));
3498
3499        Ok(())
3500    }
3501
3502    #[tokio::test]
3503    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3504        let config = create_test_node_config();
3505        let node = P2PNode::new(config).await?;
3506
3507        // Test: Try to find a peer that doesn't exist
3508        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3509        assert_eq!(result, None);
3510
3511        Ok(())
3512    }
3513
3514    #[tokio::test]
3515    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3516        let config = create_test_node_config();
3517        let node = P2PNode::new(config).await?;
3518
3519        // Test: Invalid address format should return None
3520        let result = node.get_peer_id_by_address("invalid-address").await;
3521        assert_eq!(result, None);
3522
3523        Ok(())
3524    }
3525
3526    #[tokio::test]
3527    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3528        let config = create_test_node_config();
3529        let node = P2PNode::new(config).await?;
3530
3531        // Add multiple peers with different addresses
3532        let peer1_id = "peer_1".to_string();
3533        let peer1_addr = "192.168.1.101:9001".to_string();
3534
3535        let peer2_id = "peer_2".to_string();
3536        let peer2_addr = "192.168.1.102:9002".to_string();
3537
3538        let peer1_info = PeerInfo {
3539            peer_id: peer1_id.clone(),
3540            addresses: vec![peer1_addr.clone()],
3541            connected_at: Instant::now(),
3542            last_seen: Instant::now(),
3543            status: ConnectionStatus::Connected,
3544            protocols: vec!["test-protocol".to_string()],
3545            heartbeat_count: 0,
3546        };
3547
3548        let peer2_info = PeerInfo {
3549            peer_id: peer2_id.clone(),
3550            addresses: vec![peer2_addr.clone()],
3551            connected_at: Instant::now(),
3552            last_seen: Instant::now(),
3553            status: ConnectionStatus::Connected,
3554            protocols: vec!["test-protocol".to_string()],
3555            heartbeat_count: 0,
3556        };
3557
3558        node.peers
3559            .write()
3560            .await
3561            .insert(peer1_id.clone(), peer1_info);
3562        node.peers
3563            .write()
3564            .await
3565            .insert(peer2_id.clone(), peer2_info);
3566
3567        // Test: Find each peer by their unique address
3568        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3569        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3570
3571        assert_eq!(found_peer1, Some(peer1_id));
3572        assert_eq!(found_peer2, Some(peer2_id));
3573
3574        Ok(())
3575    }
3576
3577    #[tokio::test]
3578    async fn test_list_active_connections_empty() -> Result<()> {
3579        let config = create_test_node_config();
3580        let node = P2PNode::new(config).await?;
3581
3582        // Test: No connections initially
3583        let connections = node.list_active_connections().await;
3584        assert!(connections.is_empty());
3585
3586        Ok(())
3587    }
3588
3589    #[tokio::test]
3590    async fn test_list_active_connections_with_peers() -> Result<()> {
3591        let config = create_test_node_config();
3592        let node = P2PNode::new(config).await?;
3593
3594        // Add multiple peers
3595        let peer1_id = "peer_1".to_string();
3596        let peer1_addrs = vec![
3597            "192.168.1.101:9001".to_string(),
3598            "192.168.1.101:9002".to_string(),
3599        ];
3600
3601        let peer2_id = "peer_2".to_string();
3602        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3603
3604        let peer1_info = PeerInfo {
3605            peer_id: peer1_id.clone(),
3606            addresses: peer1_addrs.clone(),
3607            connected_at: Instant::now(),
3608            last_seen: Instant::now(),
3609            status: ConnectionStatus::Connected,
3610            protocols: vec!["test-protocol".to_string()],
3611            heartbeat_count: 0,
3612        };
3613
3614        let peer2_info = PeerInfo {
3615            peer_id: peer2_id.clone(),
3616            addresses: peer2_addrs.clone(),
3617            connected_at: Instant::now(),
3618            last_seen: Instant::now(),
3619            status: ConnectionStatus::Connected,
3620            protocols: vec!["test-protocol".to_string()],
3621            heartbeat_count: 0,
3622        };
3623
3624        node.peers
3625            .write()
3626            .await
3627            .insert(peer1_id.clone(), peer1_info);
3628        node.peers
3629            .write()
3630            .await
3631            .insert(peer2_id.clone(), peer2_info);
3632
3633        // Also add to active_connections (list_active_connections iterates over this)
3634        node.active_connections
3635            .write()
3636            .await
3637            .insert(peer1_id.clone());
3638        node.active_connections
3639            .write()
3640            .await
3641            .insert(peer2_id.clone());
3642
3643        // Test: List all active connections
3644        let connections = node.list_active_connections().await;
3645        assert_eq!(connections.len(), 2);
3646
3647        // Verify peer1 and peer2 are in the list
3648        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3649        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3650
3651        assert!(peer1_conn.is_some());
3652        assert!(peer2_conn.is_some());
3653
3654        // Verify addresses match
3655        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3656        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3657
3658        Ok(())
3659    }
3660
3661    #[tokio::test]
3662    async fn test_remove_peer_success() -> Result<()> {
3663        let config = create_test_node_config();
3664        let node = P2PNode::new(config).await?;
3665
3666        // Add a peer
3667        let peer_id = "peer_to_remove".to_string();
3668        let peer_info = PeerInfo {
3669            peer_id: peer_id.clone(),
3670            addresses: vec!["192.168.1.100:9000".to_string()],
3671            connected_at: Instant::now(),
3672            last_seen: Instant::now(),
3673            status: ConnectionStatus::Connected,
3674            protocols: vec!["test-protocol".to_string()],
3675            heartbeat_count: 0,
3676        };
3677
3678        node.peers.write().await.insert(peer_id.clone(), peer_info);
3679
3680        // Verify peer exists
3681        assert!(node.is_peer_connected(&peer_id).await);
3682
3683        // Remove the peer
3684        let removed = node.remove_peer(&peer_id).await;
3685        assert!(removed);
3686
3687        // Verify peer no longer exists
3688        assert!(!node.is_peer_connected(&peer_id).await);
3689
3690        Ok(())
3691    }
3692
3693    #[tokio::test]
3694    async fn test_remove_peer_nonexistent() -> Result<()> {
3695        let config = create_test_node_config();
3696        let node = P2PNode::new(config).await?;
3697
3698        // Try to remove a peer that doesn't exist
3699        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3700        assert!(!removed);
3701
3702        Ok(())
3703    }
3704
3705    #[tokio::test]
3706    async fn test_is_peer_connected() -> Result<()> {
3707        let config = create_test_node_config();
3708        let node = P2PNode::new(config).await?;
3709
3710        let peer_id = "test_peer".to_string();
3711
3712        // Initially not connected
3713        assert!(!node.is_peer_connected(&peer_id).await);
3714
3715        // Add peer
3716        let peer_info = PeerInfo {
3717            peer_id: peer_id.clone(),
3718            addresses: vec!["192.168.1.100:9000".to_string()],
3719            connected_at: Instant::now(),
3720            last_seen: Instant::now(),
3721            status: ConnectionStatus::Connected,
3722            protocols: vec!["test-protocol".to_string()],
3723            heartbeat_count: 0,
3724        };
3725
3726        node.peers.write().await.insert(peer_id.clone(), peer_info);
3727
3728        // Now connected
3729        assert!(node.is_peer_connected(&peer_id).await);
3730
3731        // Remove peer
3732        node.remove_peer(&peer_id).await;
3733
3734        // No longer connected
3735        assert!(!node.is_peer_connected(&peer_id).await);
3736
3737        Ok(())
3738    }
3739
3740    #[test]
3741    fn test_normalize_ipv6_wildcard() {
3742        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3743
3744        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3745        let normalized = normalize_wildcard_to_loopback(wildcard);
3746
3747        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3748        assert_eq!(normalized.port(), 8080);
3749    }
3750
3751    #[test]
3752    fn test_normalize_ipv4_wildcard() {
3753        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3754
3755        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3756        let normalized = normalize_wildcard_to_loopback(wildcard);
3757
3758        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3759        assert_eq!(normalized.port(), 9000);
3760    }
3761
3762    #[test]
3763    fn test_normalize_specific_address_unchanged() {
3764        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3765        let normalized = normalize_wildcard_to_loopback(specific);
3766
3767        assert_eq!(normalized, specific);
3768    }
3769
3770    #[test]
3771    fn test_normalize_loopback_unchanged() {
3772        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3773
3774        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3775        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3776        assert_eq!(normalized_v6, loopback_v6);
3777
3778        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3779        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3780        assert_eq!(normalized_v4, loopback_v4);
3781    }
3782}