saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] // Temporarily unused during migration
27use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::sync::{RwLock, broadcast};
37use tokio::time::Instant;
38use tracing::{debug, info, warn};
39
40/// Configuration for a P2P node
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct NodeConfig {
43    /// Local peer ID for this node
44    pub peer_id: Option<PeerId>,
45
46    /// Addresses to listen on for incoming connections
47    pub listen_addrs: Vec<std::net::SocketAddr>,
48
49    /// Primary listen address (for compatibility)
50    pub listen_addr: std::net::SocketAddr,
51
52    /// Bootstrap peers to connect to on startup (legacy)
53    pub bootstrap_peers: Vec<std::net::SocketAddr>,
54
55    /// Bootstrap peers as strings (for integration tests)
56    pub bootstrap_peers_str: Vec<String>,
57
58    /// Enable IPv6 support
59    pub enable_ipv6: bool,
60
61    // MCP removed; will be redesigned later
62    /// Connection timeout duration
63    pub connection_timeout: Duration,
64
65    /// Keep-alive interval for connections
66    pub keep_alive_interval: Duration,
67
68    /// Maximum number of concurrent connections
69    pub max_connections: usize,
70
71    /// Maximum number of incoming connections
72    pub max_incoming_connections: usize,
73
74    /// DHT configuration
75    pub dht_config: DHTConfig,
76
77    /// Security configuration
78    pub security_config: SecurityConfig,
79
80    /// Production hardening configuration
81    pub production_config: Option<ProductionConfig>,
82
83    /// Bootstrap cache configuration
84    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
85}
86
87/// DHT-specific configuration
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DHTConfig {
90    /// Kademlia K parameter (bucket size)
91    pub k_value: usize,
92
93    /// Kademlia alpha parameter (parallelism)
94    pub alpha_value: usize,
95
96    /// DHT record TTL
97    pub record_ttl: Duration,
98
99    /// DHT refresh interval
100    pub refresh_interval: Duration,
101}
102
103/// Security configuration
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SecurityConfig {
106    /// Enable noise protocol for encryption
107    pub enable_noise: bool,
108
109    /// Enable TLS for secure transport
110    pub enable_tls: bool,
111
112    /// Trust level for peer verification
113    pub trust_level: TrustLevel,
114}
115
116/// Trust level for peer verification
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub enum TrustLevel {
119    /// No verification required
120    None,
121    /// Basic peer ID verification
122    Basic,
123    /// Full cryptographic verification
124    Full,
125}
126
127impl NodeConfig {
128    /// Create a new NodeConfig with default values
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if default addresses cannot be parsed
133    pub fn new() -> Result<Self> {
134        // Load config and use its defaults
135        let config = Config::default();
136
137        // Parse the default listen address
138        let listen_addr = config.listen_socket_addr()?;
139
140        // Create listen addresses based on config
141        let mut listen_addrs = vec![];
142
143        // Add IPv6 address if enabled
144        if config.network.ipv6_enabled {
145            let ipv6_addr = std::net::SocketAddr::new(
146                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
147                listen_addr.port(),
148            );
149            listen_addrs.push(ipv6_addr);
150        }
151
152        // Always add IPv4
153        let ipv4_addr = std::net::SocketAddr::new(
154            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
155            listen_addr.port(),
156        );
157        listen_addrs.push(ipv4_addr);
158
159        Ok(Self {
160            peer_id: None,
161            listen_addrs,
162            listen_addr,
163            bootstrap_peers: Vec::new(),
164            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
165            enable_ipv6: config.network.ipv6_enabled,
166
167            connection_timeout: Duration::from_secs(config.network.connection_timeout),
168            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
169            max_connections: config.network.max_connections,
170            max_incoming_connections: config.security.connection_limit as usize,
171            dht_config: DHTConfig::default(),
172            security_config: SecurityConfig::default(),
173            production_config: None,
174            bootstrap_cache_config: None,
175            // identity_config: None,
176        })
177    }
178}
179
180impl Default for NodeConfig {
181    fn default() -> Self {
182        // Use config defaults for network settings
183        let config = Config::default();
184
185        // Parse the default listen address - use safe fallback if parsing fails
186        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
187            std::net::SocketAddr::new(
188                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
189                9000,
190            )
191        });
192
193        Self {
194            peer_id: None,
195            listen_addrs: vec![
196                std::net::SocketAddr::new(
197                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
198                    listen_addr.port(),
199                ),
200                std::net::SocketAddr::new(
201                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
202                    listen_addr.port(),
203                ),
204            ],
205            listen_addr,
206            bootstrap_peers: Vec::new(),
207            bootstrap_peers_str: Vec::new(),
208            enable_ipv6: config.network.ipv6_enabled,
209
210            connection_timeout: Duration::from_secs(config.network.connection_timeout),
211            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
212            max_connections: config.network.max_connections,
213            max_incoming_connections: config.security.connection_limit as usize,
214            dht_config: DHTConfig::default(),
215            security_config: SecurityConfig::default(),
216            production_config: None, // Use default production config if enabled
217            bootstrap_cache_config: None,
218            // identity_config: None, // Use default identity config if enabled
219        }
220    }
221}
222
223impl NodeConfig {
224    /// Create NodeConfig from Config
225    pub fn from_config(config: &Config) -> Result<Self> {
226        let listen_addr = config.listen_socket_addr()?;
227        let bootstrap_addrs = config.bootstrap_addrs()?;
228
229        let mut node_config = Self {
230            peer_id: None,
231            listen_addrs: vec![listen_addr],
232            listen_addr,
233            bootstrap_peers: bootstrap_addrs
234                .iter()
235                .map(|addr| addr.socket_addr())
236                .collect(),
237            bootstrap_peers_str: config
238                .network
239                .bootstrap_nodes
240                .iter()
241                .map(|addr| addr.to_string())
242                .collect(),
243            enable_ipv6: config.network.ipv6_enabled,
244
245            connection_timeout: Duration::from_secs(config.network.connection_timeout),
246            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
247            max_connections: config.network.max_connections,
248            max_incoming_connections: config.security.connection_limit as usize,
249            dht_config: DHTConfig {
250                k_value: 20,
251                alpha_value: 3,
252                record_ttl: Duration::from_secs(3600),
253                refresh_interval: Duration::from_secs(900),
254            },
255            security_config: SecurityConfig {
256                enable_noise: true,
257                enable_tls: true,
258                trust_level: TrustLevel::Basic,
259            },
260            production_config: Some(ProductionConfig {
261                max_connections: config.network.max_connections,
262                max_memory_bytes: 0,  // unlimited
263                max_bandwidth_bps: 0, // unlimited
264                connection_timeout: Duration::from_secs(config.network.connection_timeout),
265                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266                health_check_interval: Duration::from_secs(30),
267                metrics_interval: Duration::from_secs(60),
268                enable_performance_tracking: true,
269                enable_auto_cleanup: true,
270                shutdown_timeout: Duration::from_secs(30),
271                rate_limits: crate::production::RateLimitConfig::default(),
272            }),
273            bootstrap_cache_config: None,
274            // identity_config: Some(IdentityManagerConfig {
275            //     cache_ttl: Duration::from_secs(3600),
276            //     challenge_timeout: Duration::from_secs(30),
277            // }),
278        };
279
280        // Add IPv6 listen address if enabled
281        if config.network.ipv6_enabled {
282            node_config.listen_addrs.push(std::net::SocketAddr::new(
283                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
284                listen_addr.port(),
285            ));
286        }
287
288        Ok(node_config)
289    }
290
291    /// Try to build a NodeConfig from a listen address string
292    pub fn with_listen_addr(addr: &str) -> Result<Self> {
293        let listen_addr: std::net::SocketAddr = addr
294            .parse()
295            .map_err(|e: std::net::AddrParseError| {
296                NetworkError::InvalidAddress(e.to_string().into())
297            })
298            .map_err(P2PError::Network)?;
299        let cfg = NodeConfig {
300            listen_addr,
301            listen_addrs: vec![listen_addr],
302            ..Default::default()
303        };
304        Ok(cfg)
305    }
306}
307
308impl Default for DHTConfig {
309    fn default() -> Self {
310        Self {
311            k_value: 20,
312            alpha_value: 5,
313            record_ttl: Duration::from_secs(3600), // 1 hour
314            refresh_interval: Duration::from_secs(600), // 10 minutes
315        }
316    }
317}
318
319impl Default for SecurityConfig {
320    fn default() -> Self {
321        Self {
322            enable_noise: true,
323            enable_tls: true,
324            trust_level: TrustLevel::Basic,
325        }
326    }
327}
328
329/// Information about a connected peer
330#[derive(Debug, Clone)]
331pub struct PeerInfo {
332    /// Peer identifier
333    pub peer_id: PeerId,
334
335    /// Peer's addresses
336    pub addresses: Vec<String>,
337
338    /// Connection timestamp
339    pub connected_at: Instant,
340
341    /// Last seen timestamp
342    pub last_seen: Instant,
343
344    /// Connection status
345    pub status: ConnectionStatus,
346
347    /// Supported protocols
348    pub protocols: Vec<String>,
349
350    /// Number of heartbeats received
351    pub heartbeat_count: u64,
352}
353
354/// Connection status for a peer
355#[derive(Debug, Clone, PartialEq)]
356pub enum ConnectionStatus {
357    /// Connection is being established
358    Connecting,
359    /// Connection is established and active
360    Connected,
361    /// Connection is being closed
362    Disconnecting,
363    /// Connection is closed
364    Disconnected,
365    /// Connection failed
366    Failed(String),
367}
368
369/// Network events that can occur
370#[derive(Debug, Clone)]
371pub enum NetworkEvent {
372    /// A new peer has connected
373    PeerConnected {
374        /// The identifier of the newly connected peer
375        peer_id: PeerId,
376        /// The network addresses where the peer can be reached
377        addresses: Vec<String>,
378    },
379
380    /// A peer has disconnected
381    PeerDisconnected {
382        /// The identifier of the disconnected peer
383        peer_id: PeerId,
384        /// The reason for the disconnection
385        reason: String,
386    },
387
388    /// A message was received from a peer
389    MessageReceived {
390        /// The identifier of the sending peer
391        peer_id: PeerId,
392        /// The protocol used for the message
393        protocol: String,
394        /// The raw message data
395        data: Vec<u8>,
396    },
397
398    /// A connection attempt failed
399    ConnectionFailed {
400        /// The identifier of the peer (if known)
401        peer_id: Option<PeerId>,
402        /// The address where connection was attempted
403        address: String,
404        /// The error message describing the failure
405        error: String,
406    },
407
408    /// DHT record was stored
409    DHTRecordStored {
410        /// The DHT key where the record was stored
411        key: Vec<u8>,
412        /// The value that was stored
413        value: Vec<u8>,
414    },
415
416    /// DHT record was retrieved
417    DHTRecordRetrieved {
418        /// The DHT key that was queried
419        key: Vec<u8>,
420        /// The retrieved value, if found
421        value: Option<Vec<u8>>,
422    },
423}
424
425/// Network events that can occur in the P2P system
426///
427/// Events are broadcast to all listeners and provide real-time
428/// notifications of network state changes and message arrivals.
429#[derive(Debug, Clone)]
430pub enum P2PEvent {
431    /// Message received from a peer on a specific topic
432    Message {
433        /// Topic or channel the message was sent on
434        topic: String,
435        /// Peer ID of the message sender
436        source: PeerId,
437        /// Raw message data payload
438        data: Vec<u8>,
439    },
440    /// A new peer has connected to the network
441    PeerConnected(PeerId),
442    /// A peer has disconnected from the network
443    PeerDisconnected(PeerId),
444}
445
446/// Main P2P node structure
447/// Main P2P network node that manages connections, routing, and communication
448///
449/// This struct represents a complete P2P network participant that can:
450/// - Connect to other peers via QUIC transport
451/// - Participate in distributed hash table (DHT) operations
452/// - Send and receive messages through various protocols
453/// - Handle network events and peer lifecycle
454/// - Provide MCP (Model Context Protocol) services
455pub struct P2PNode {
456    /// Node configuration
457    config: NodeConfig,
458
459    /// Our peer ID
460    peer_id: PeerId,
461
462    /// Connected peers
463    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
464
465    /// Network event broadcaster
466    event_tx: broadcast::Sender<P2PEvent>,
467
468    /// Listen addresses
469    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
470
471    /// Node start time
472    start_time: Instant,
473
474    /// Running state
475    running: RwLock<bool>,
476
477    /// DHT instance (optional)
478    dht: Option<Arc<RwLock<DHT>>>,
479
480    /// Production resource manager (optional)
481    resource_manager: Option<Arc<ResourceManager>>,
482
483    /// Bootstrap cache manager for peer discovery
484    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
485
486    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
487    dual_node: Arc<DualStackNetworkNode>,
488
489    /// Rate limiter for connection and request throttling
490    #[allow(dead_code)]
491    rate_limiter: Arc<RateLimiter>,
492}
493
494impl P2PNode {
495    /// Minimal constructor for tests that avoids real networking
496    pub fn new_for_tests() -> Result<Self> {
497        let (event_tx, _) = broadcast::channel(16);
498        Ok(Self {
499            config: NodeConfig::default(),
500            peer_id: "test_peer".to_string(),
501            peers: Arc::new(RwLock::new(HashMap::new())),
502            event_tx,
503            listen_addrs: RwLock::new(Vec::new()),
504            start_time: Instant::now(),
505            running: RwLock::new(false),
506            dht: None,
507            resource_manager: None,
508            bootstrap_manager: None,
509            dual_node: {
510                // Bind dual-stack nodes on ephemeral ports for tests
511                let v6: Option<std::net::SocketAddr> = "[::1]:0"
512                    .parse()
513                    .ok()
514                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
515                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
516                let handle = tokio::runtime::Handle::current();
517                let dual_attempt = handle.block_on(
518                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
519                );
520                let dual = match dual_attempt {
521                    Ok(d) => d,
522                    Err(_e1) => {
523                        // Fallback to IPv4-only ephemeral bind
524                        let fallback = handle.block_on(
525                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
526                                None,
527                                "127.0.0.1:0".parse().ok(),
528                            ),
529                        );
530                        match fallback {
531                            Ok(d) => d,
532                            Err(e2) => {
533                                return Err(P2PError::Network(NetworkError::BindError(
534                                    format!("Failed to create dual-stack network node: {}", e2)
535                                        .into(),
536                                )));
537                            }
538                        }
539                    }
540                };
541                Arc::new(dual)
542            },
543            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
544                max_requests: 100,
545                burst_size: 100,
546                window: std::time::Duration::from_secs(1),
547                ..Default::default()
548            })),
549        })
550    }
551    /// Create a new P2P node with the given configuration
552    pub async fn new(config: NodeConfig) -> Result<Self> {
553        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
554            // Generate a random peer ID for now
555            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
556        });
557
558        let (event_tx, _) = broadcast::channel(1000);
559
560        // Initialize and register a TrustWeightedKademlia DHT for the global API
561        // Use a deterministic local NodeId derived from the peer_id
562        {
563            use blake3::Hasher;
564            let mut hasher = Hasher::new();
565            hasher.update(peer_id.as_bytes());
566            let digest = hasher.finalize();
567            let mut nid = [0u8; 32];
568            nid.copy_from_slice(digest.as_bytes());
569            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
570                crate::identity::node_identity::NodeId::from_bytes(nid),
571            ));
572            // TODO: Update to use new clean API
573            // let _ = crate::api::set_dht_instance(twdht);
574        }
575
576        // Initialize DHT if needed
577        let dht = if true {
578            // Always enable DHT for now
579            let _dht_config = crate::dht::DHTConfig {
580                replication_factor: config.dht_config.k_value,
581                bucket_size: config.dht_config.k_value,
582                alpha: config.dht_config.alpha_value,
583                record_ttl: config.dht_config.record_ttl,
584                bucket_refresh_interval: config.dht_config.refresh_interval,
585                republish_interval: config.dht_config.refresh_interval,
586                max_distance: 160, // 160 bits for SHA-256
587            };
588            // Convert peer_id String to NodeId
589            let peer_bytes = peer_id.as_bytes();
590            let mut node_id_bytes = [0u8; 32];
591            let len = peer_bytes.len().min(32);
592            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
593            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
594            let dht_instance = DHT::new(node_id).map_err(|e| {
595                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
596                    e.to_string().into(),
597                ))
598            })?;
599            Some(Arc::new(RwLock::new(dht_instance)))
600        } else {
601            None
602        };
603
604        // MCP removed
605
606        // Initialize production resource manager if configured
607        let resource_manager = config
608            .production_config
609            .clone()
610            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
611
612        // Initialize bootstrap cache manager
613        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
614            match BootstrapManager::with_config(cache_config.clone()).await {
615                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
616                Err(e) => {
617                    warn!(
618                        "Failed to initialize bootstrap manager: {}, continuing without cache",
619                        e
620                    );
621                    None
622                }
623            }
624        } else {
625            match BootstrapManager::new().await {
626                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
627                Err(e) => {
628                    warn!(
629                        "Failed to initialize bootstrap manager: {}, continuing without cache",
630                        e
631                    );
632                    None
633                }
634            }
635        };
636
637        // Initialize dual-stack ant-quic nodes
638        let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
639            let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
640            let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
641            (v6_addr, v4_addr)
642        } else {
643            // Defaults: always listen on IPv4; IPv6 if enabled
644            let v4_addr = Some(std::net::SocketAddr::new(
645                std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
646                config.listen_addr.port(),
647            ));
648            let v6_addr = if config.enable_ipv6 {
649                Some(std::net::SocketAddr::new(
650                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
651                    config.listen_addr.port(),
652                ))
653            } else {
654                None
655            };
656            (v6_addr, v4_addr)
657        };
658
659        let dual_node = Arc::new(
660            DualStackNetworkNode::new(v6_opt, v4_opt)
661                .await
662                .map_err(|e| {
663                    P2PError::Transport(crate::error::TransportError::SetupFailed(
664                        format!("Failed to create dual-stack network nodes: {}", e).into(),
665                    ))
666                })?,
667        );
668
669        // Initialize rate limiter with default config
670        let rate_limiter = Arc::new(RateLimiter::new(
671            crate::validation::RateLimitConfig::default(),
672        ));
673
674        let node = Self {
675            config,
676            peer_id,
677            peers: Arc::new(RwLock::new(HashMap::new())),
678            event_tx,
679            listen_addrs: RwLock::new(Vec::new()),
680            start_time: Instant::now(),
681            running: RwLock::new(false),
682            dht,
683            resource_manager,
684            bootstrap_manager,
685            dual_node,
686            rate_limiter,
687        };
688        info!("Created P2P node with peer ID: {}", node.peer_id);
689
690        Ok(node)
691    }
692
693    /// Create a new node builder
694    pub fn builder() -> NodeBuilder {
695        NodeBuilder::new()
696    }
697
698    /// Get the peer ID of this node
699    pub fn peer_id(&self) -> &PeerId {
700        &self.peer_id
701    }
702
703    pub fn local_addr(&self) -> Option<String> {
704        self.listen_addrs
705            .try_read()
706            .ok()
707            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
708    }
709
710    pub async fn subscribe(&self, topic: &str) -> Result<()> {
711        // In a real implementation, this would register the topic with the pubsub mechanism.
712        // For now, we just log it.
713        info!("Subscribed to topic: {}", topic);
714        Ok(())
715    }
716
717    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
718        info!(
719            "Publishing message to topic: {} ({} bytes)",
720            topic,
721            data.len()
722        );
723
724        // Get list of connected peers
725        let peer_list: Vec<PeerId> = {
726            let peers_guard = self.peers.read().await;
727            peers_guard.keys().cloned().collect()
728        };
729
730        if peer_list.is_empty() {
731            debug!("No peers connected, message will only be sent to local subscribers");
732        } else {
733            // Send message to all connected peers
734            let mut send_count = 0;
735            for peer_id in &peer_list {
736                match self.send_message(peer_id, topic, data.to_vec()).await {
737                    Ok(_) => {
738                        send_count += 1;
739                        debug!("Sent message to peer: {}", peer_id);
740                    }
741                    Err(e) => {
742                        warn!("Failed to send message to peer {}: {}", peer_id, e);
743                    }
744                }
745            }
746            info!(
747                "Published message to {}/{} connected peers",
748                send_count,
749                peer_list.len()
750            );
751        }
752
753        // Also send to local subscribers (for local echo and testing)
754        let event = P2PEvent::Message {
755            topic: topic.to_string(),
756            source: self.peer_id.clone(),
757            data: data.to_vec(),
758        };
759        let _ = self.event_tx.send(event);
760
761        Ok(())
762    }
763
764    /// Get the node configuration
765    pub fn config(&self) -> &NodeConfig {
766        &self.config
767    }
768
769    /// Start the P2P node
770    pub async fn start(&self) -> Result<()> {
771        info!("Starting P2P node...");
772
773        // Start production resource manager if configured
774        if let Some(ref resource_manager) = self.resource_manager {
775            resource_manager.start().await.map_err(|e| {
776                P2PError::Network(crate::error::NetworkError::ProtocolError(
777                    format!("Failed to start resource manager: {e}").into(),
778                ))
779            })?;
780            info!("Production resource manager started");
781        }
782
783        // Start bootstrap manager background tasks
784        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
785            let mut manager = bootstrap_manager.write().await;
786            manager.start_background_tasks().await.map_err(|e| {
787                P2PError::Network(crate::error::NetworkError::ProtocolError(
788                    format!("Failed to start bootstrap manager: {e}").into(),
789                ))
790            })?;
791            info!("Bootstrap cache manager started");
792        }
793
794        // Set running state
795        *self.running.write().await = true;
796
797        // Start listening on configured addresses using transport layer
798        self.start_network_listeners().await?;
799
800        // Log current listen addresses
801        let listen_addrs = self.listen_addrs.read().await;
802        info!("P2P node started on addresses: {:?}", *listen_addrs);
803
804        // MCP removed
805
806        // Start message receiving system
807        self.start_message_receiving_system().await?;
808
809        // Connect to bootstrap peers
810        self.connect_bootstrap_peers().await?;
811
812        Ok(())
813    }
814
815    /// Start network listeners on configured addresses
816    async fn start_network_listeners(&self) -> Result<()> {
817        info!("Starting dual-stack listeners (ant-quic)...");
818        // Update our listen_addrs from the dual node bindings
819        let addrs = self.dual_node.local_addrs();
820        {
821            let mut la = self.listen_addrs.write().await;
822            *la = addrs.clone();
823        }
824
825        // Spawn a background accept loop that handles incoming connections from either stack
826        let event_tx = self.event_tx.clone();
827        let peers = self.peers.clone();
828        let rate_limiter = self.rate_limiter.clone();
829        let dual = self.dual_node.clone();
830        tokio::spawn(async move {
831            loop {
832                match dual.accept_any().await {
833                    Ok((ant_peer_id, remote_sock)) => {
834                        let peer_id =
835                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
836                        let remote_addr = NetworkAddress::from(remote_sock);
837                        // Optional: basic IP rate limiting
838                        let _ = rate_limiter.check_ip(&remote_sock.ip());
839                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
840                        register_new_peer(&peers, &peer_id, &remote_addr).await;
841                    }
842                    Err(e) => {
843                        warn!("Accept failed: {}", e);
844                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
845                    }
846                }
847            }
848        });
849
850        info!("Dual-stack listeners active on: {:?}", addrs);
851        Ok(())
852    }
853
854    /// Start a listener on a specific socket address
855    #[allow(dead_code)]
856    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
857        // use crate::transport::{Transport}; // Unused during migration
858
859        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
860        /*
861        // Try QUIC first (preferred transport)
862        match crate::transport::QuicTransport::new(Default::default()) {
863            Ok(quic_transport) => {
864                match quic_transport.listen(NetworkAddress::new(addr)).await {
865                    Ok(listen_addrs) => {
866                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
867
868                        // Store the actual listening addresses in the node
869                        {
870                            let mut node_listen_addrs = self.listen_addrs.write().await;
871                            // Don't clear - accumulate addresses from multiple listeners
872                            node_listen_addrs.push(listen_addrs.socket_addr());
873                        }
874
875                        // Start accepting connections in background
876                        self.start_connection_acceptor(
877                            Arc::new(quic_transport),
878                            addr,
879                            crate::transport::TransportType::QUIC
880                        ).await?;
881
882                        return Ok(());
883                    }
884                    Err(e) => {
885                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
886                    }
887                }
888            }
889            Err(e) => {
890                warn!("Failed to create QUIC transport for listening: {}", e);
891            }
892        }
893        */
894
895        warn!("QUIC transport temporarily disabled during ant-quic migration");
896        // No TCP fallback - QUIC only
897        Err(crate::P2PError::Transport(
898            crate::error::TransportError::SetupFailed(
899                format!(
900                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
901                )
902                .into(),
903            ),
904        ))
905    }
906
907    /// Start connection acceptor background task
908    #[allow(dead_code)] // Deprecated during ant-quic migration
909    async fn start_connection_acceptor(
910        &self,
911        transport: Arc<dyn crate::transport::Transport>,
912        addr: std::net::SocketAddr,
913        transport_type: crate::transport::TransportType,
914    ) -> Result<()> {
915        info!(
916            "Starting connection acceptor for {:?} on {}",
917            transport_type, addr
918        );
919
920        // Clone necessary data for the background task
921        let event_tx = self.event_tx.clone();
922        let _peer_id = self.peer_id.clone();
923        let peers = Arc::clone(&self.peers);
924        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
925
926        let rate_limiter = Arc::clone(&self.rate_limiter);
927
928        // Spawn background task to accept incoming connections
929        tokio::spawn(async move {
930            loop {
931                match transport.accept().await {
932                    Ok(connection) => {
933                        let remote_addr = connection.remote_addr();
934                        let connection_peer_id =
935                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
936
937                        // Apply rate limiting for incoming connections
938                        let socket_addr = remote_addr.socket_addr();
939                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
940                            // Connection dropped automatically when it goes out of scope
941                            continue;
942                        }
943
944                        info!(
945                            "Accepted {:?} connection from {} (peer: {})",
946                            transport_type, remote_addr, connection_peer_id
947                        );
948
949                        // Generate peer connected event
950                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
951
952                        // Store the peer connection
953                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
954
955                        // Spawn task to handle this specific connection's messages
956                        spawn_connection_handler(
957                            connection,
958                            connection_peer_id,
959                            event_tx.clone(),
960                            Arc::clone(&peers),
961                        );
962                    }
963                    Err(e) => {
964                        warn!(
965                            "Failed to accept {:?} connection on {}: {}",
966                            transport_type, addr, e
967                        );
968
969                        // Brief pause before retrying to avoid busy loop
970                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
971                    }
972                }
973            }
974        });
975
976        info!(
977            "Connection acceptor background task started for {:?} on {}",
978            transport_type, addr
979        );
980        Ok(())
981    }
982
983    /// Start the message receiving system with background tasks
984    async fn start_message_receiving_system(&self) -> Result<()> {
985        info!("Starting message receiving system");
986        let dual = self.dual_node.clone();
987        let event_tx = self.event_tx.clone();
988
989        tokio::spawn(async move {
990            loop {
991                match dual.receive_any().await {
992                    Ok((_peer_id, bytes)) => {
993                        // Expect the JSON message wrapper from create_protocol_message
994                        #[allow(clippy::collapsible_if)]
995                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
996                            if let (Some(protocol), Some(data), Some(from)) = (
997                                value.get("protocol").and_then(|v| v.as_str()),
998                                value.get("data").and_then(|v| v.as_array()),
999                                value.get("from").and_then(|v| v.as_str()),
1000                            ) {
1001                                let payload: Vec<u8> = data
1002                                    .iter()
1003                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1004                                    .collect();
1005                                let _ = event_tx.send(P2PEvent::Message {
1006                                    topic: protocol.to_string(),
1007                                    source: from.to_string(),
1008                                    data: payload,
1009                                });
1010                            }
1011                        }
1012                    }
1013                    Err(e) => {
1014                        warn!("Receive error: {}", e);
1015                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1016                    }
1017                }
1018            }
1019        });
1020
1021        Ok(())
1022    }
1023
1024    /// Handle a received message and generate appropriate events
1025    #[allow(dead_code)]
1026    async fn handle_received_message(
1027        &self,
1028        message_data: Vec<u8>,
1029        peer_id: &PeerId,
1030        _protocol: &str,
1031        event_tx: &broadcast::Sender<P2PEvent>,
1032    ) -> Result<()> {
1033        // MCP removed: no special protocol handling
1034
1035        // Parse the message format we created in create_protocol_message
1036        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1037            Ok(message) => {
1038                if let (Some(protocol), Some(data), Some(from)) = (
1039                    message.get("protocol").and_then(|v| v.as_str()),
1040                    message.get("data").and_then(|v| v.as_array()),
1041                    message.get("from").and_then(|v| v.as_str()),
1042                ) {
1043                    // Convert data array back to bytes
1044                    let data_bytes: Vec<u8> = data
1045                        .iter()
1046                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1047                        .collect();
1048
1049                    // Generate message event
1050                    let event = P2PEvent::Message {
1051                        topic: protocol.to_string(),
1052                        source: from.to_string(),
1053                        data: data_bytes,
1054                    };
1055
1056                    let _ = event_tx.send(event);
1057                    debug!("Generated message event from peer: {}", peer_id);
1058                }
1059            }
1060            Err(e) => {
1061                warn!("Failed to parse received message from {}: {}", peer_id, e);
1062            }
1063        }
1064
1065        Ok(())
1066    }
1067
1068    // MCP removed
1069
1070    // MCP removed
1071
1072    /// Run the P2P node (blocks until shutdown)
1073    pub async fn run(&self) -> Result<()> {
1074        if !*self.running.read().await {
1075            self.start().await?;
1076        }
1077
1078        info!("P2P node running...");
1079
1080        // Main event loop
1081        loop {
1082            if !*self.running.read().await {
1083                break;
1084            }
1085
1086            // Perform periodic tasks
1087            self.periodic_tasks().await?;
1088
1089            // Sleep for a short interval
1090            tokio::time::sleep(Duration::from_millis(100)).await;
1091        }
1092
1093        info!("P2P node stopped");
1094        Ok(())
1095    }
1096
1097    /// Stop the P2P node
1098    pub async fn stop(&self) -> Result<()> {
1099        info!("Stopping P2P node...");
1100
1101        // Set running state to false
1102        *self.running.write().await = false;
1103
1104        // Disconnect all peers
1105        self.disconnect_all_peers().await?;
1106
1107        // Shutdown production resource manager if configured
1108        if let Some(ref resource_manager) = self.resource_manager {
1109            resource_manager.shutdown().await.map_err(|e| {
1110                P2PError::Network(crate::error::NetworkError::ProtocolError(
1111                    format!("Failed to shutdown resource manager: {e}").into(),
1112                ))
1113            })?;
1114            info!("Production resource manager stopped");
1115        }
1116
1117        info!("P2P node stopped");
1118        Ok(())
1119    }
1120
1121    /// Graceful shutdown alias for tests
1122    pub async fn shutdown(&self) -> Result<()> {
1123        self.stop().await
1124    }
1125
1126    /// Check if the node is running
1127    pub async fn is_running(&self) -> bool {
1128        *self.running.read().await
1129    }
1130
1131    /// Get the current listen addresses
1132    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1133        self.listen_addrs.read().await.clone()
1134    }
1135
1136    /// Get connected peers
1137    pub async fn connected_peers(&self) -> Vec<PeerId> {
1138        self.peers.read().await.keys().cloned().collect()
1139    }
1140
1141    /// Get peer count
1142    pub async fn peer_count(&self) -> usize {
1143        self.peers.read().await.len()
1144    }
1145
1146    /// Get peer info
1147    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1148        self.peers.read().await.get(peer_id).cloned()
1149    }
1150
1151    /// Connect to a peer
1152    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1153        info!("Connecting to peer at: {}", address);
1154
1155        // Check production limits if resource manager is enabled
1156        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1157            Some(resource_manager.acquire_connection().await?)
1158        } else {
1159            None
1160        };
1161
1162        // Parse the address to SocketAddr format
1163        let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1164            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1165                format!("{}: {}", address, e).into(),
1166            ))
1167        })?;
1168
1169        // Establish a real connection via dual-stack Happy Eyeballs
1170        let peer_id = {
1171            match self
1172                .dual_node
1173                .connect_happy_eyeballs(&[_socket_addr])
1174                .await
1175                .map(|p| crate::transport::ant_quic_adapter::ant_peer_id_to_string(&p))
1176            {
1177                Ok(connected_peer_id) => {
1178                    info!("Successfully connected to peer: {}", connected_peer_id);
1179                    connected_peer_id
1180                }
1181                Err(e) => {
1182                    warn!("Failed to connect to peer at {}: {}", address, e);
1183
1184                    // For demo purposes, try a simplified connection approach
1185                    // Create a mock peer ID based on address for now
1186                    let demo_peer_id =
1187                        format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
1188                    warn!(
1189                        "Using demo peer ID: {} (transport connection failed)",
1190                        demo_peer_id
1191                    );
1192                    demo_peer_id
1193                }
1194            }
1195        };
1196
1197        // Create peer info with connection details
1198        let peer_info = PeerInfo {
1199            peer_id: peer_id.clone(),
1200            addresses: vec![address.to_string()],
1201            connected_at: Instant::now(),
1202            last_seen: Instant::now(),
1203            status: ConnectionStatus::Connected,
1204            protocols: vec!["p2p-foundation/1.0".to_string()],
1205            heartbeat_count: 0,
1206        };
1207
1208        // Store peer information
1209        self.peers.write().await.insert(peer_id.clone(), peer_info);
1210
1211        // Record bandwidth usage if resource manager is enabled
1212        if let Some(ref resource_manager) = self.resource_manager {
1213            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1214        }
1215
1216        // Emit connection event
1217        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1218
1219        info!("Connected to peer: {}", peer_id);
1220        Ok(peer_id)
1221    }
1222
1223    /// Disconnect from a peer
1224    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1225        info!("Disconnecting from peer: {}", peer_id);
1226
1227        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1228            peer_info.status = ConnectionStatus::Disconnected;
1229
1230            // Emit event
1231            let _ = self
1232                .event_tx
1233                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1234
1235            info!("Disconnected from peer: {}", peer_id);
1236        }
1237
1238        Ok(())
1239    }
1240
1241    /// Send a message to a peer
1242    pub async fn send_message(
1243        &self,
1244        peer_id: &PeerId,
1245        protocol: &str,
1246        data: Vec<u8>,
1247    ) -> Result<()> {
1248        debug!(
1249            "Sending message to peer {} on protocol {}",
1250            peer_id, protocol
1251        );
1252
1253        // Check rate limits if resource manager is enabled
1254        if let Some(ref resource_manager) = self.resource_manager
1255            && !resource_manager
1256                .check_rate_limit(peer_id, "message")
1257                .await?
1258        {
1259            return Err(P2PError::ResourceExhausted(
1260                format!("Rate limit exceeded for peer {}", peer_id).into(),
1261            ));
1262        }
1263
1264        // Check if peer is connected
1265        if !self.peers.read().await.contains_key(peer_id) {
1266            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1267                peer_id.to_string().into(),
1268            )));
1269        }
1270
1271        // MCP removed: no special-case protocol validation
1272
1273        // Record bandwidth usage if resource manager is enabled
1274        if let Some(ref resource_manager) = self.resource_manager {
1275            resource_manager.record_bandwidth(data.len() as u64, 0);
1276        }
1277
1278        // Create protocol message wrapper
1279        let _message_data = self.create_protocol_message(protocol, data)?;
1280
1281        // Send via ant-quic dual-node
1282        self.dual_node
1283            .send_to_peer_string(peer_id, &_message_data)
1284            .await
1285            .map_err(|e| {
1286                P2PError::Transport(crate::error::TransportError::StreamError(
1287                    e.to_string().into(),
1288                ))
1289            })
1290    }
1291
1292    /// Create a protocol message wrapper
1293    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1294        use serde_json::json;
1295
1296        let timestamp = std::time::SystemTime::now()
1297            .duration_since(std::time::UNIX_EPOCH)
1298            .map_err(|e| {
1299                P2PError::Network(NetworkError::ProtocolError(
1300                    format!("System time error: {}", e).into(),
1301                ))
1302            })?
1303            .as_secs();
1304
1305        // Create a simple message format for P2P communication
1306        let message = json!({
1307            "protocol": protocol,
1308            "data": data,
1309            "from": self.peer_id,
1310            "timestamp": timestamp
1311        });
1312
1313        serde_json::to_vec(&message).map_err(|e| {
1314            P2PError::Transport(crate::error::TransportError::StreamError(
1315                format!("Failed to serialize message: {e}").into(),
1316            ))
1317        })
1318    }
1319
1320    // Note: async listen_addrs() already exists above for fetching listen addresses
1321}
1322
1323/// Create a protocol message wrapper (static version for background tasks)
1324#[allow(dead_code)]
1325fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1326    use serde_json::json;
1327
1328    let timestamp = std::time::SystemTime::now()
1329        .duration_since(std::time::UNIX_EPOCH)
1330        .map_err(|e| {
1331            P2PError::Network(NetworkError::ProtocolError(
1332                format!("System time error: {}", e).into(),
1333            ))
1334        })?
1335        .as_secs();
1336
1337    // Create a simple message format for P2P communication
1338    let message = json!({
1339        "protocol": protocol,
1340        "data": data,
1341        "timestamp": timestamp
1342    });
1343
1344    serde_json::to_vec(&message).map_err(|e| {
1345        P2PError::Transport(crate::error::TransportError::StreamError(
1346            format!("Failed to serialize message: {e}").into(),
1347        ))
1348    })
1349}
1350
1351impl P2PNode {
1352    /// Subscribe to network events
1353    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1354        self.event_tx.subscribe()
1355    }
1356
1357    /// Backwards-compat event stream accessor for tests
1358    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1359        self.subscribe_events()
1360    }
1361
1362    /// Get node uptime
1363    pub fn uptime(&self) -> Duration {
1364        self.start_time.elapsed()
1365    }
1366
1367    // MCP removed: all MCP tool/service methods removed
1368
1369    // /// Handle MCP remote tool call with network integration
1370
1371    // /// List tools available on a specific remote peer
1372
1373    // /// Get MCP server statistics
1374
1375    /// Get production resource metrics
1376    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1377        if let Some(ref resource_manager) = self.resource_manager {
1378            Ok(resource_manager.get_metrics().await)
1379        } else {
1380            Err(P2PError::Network(
1381                crate::error::NetworkError::ProtocolError(
1382                    "Production resource manager not enabled".to_string().into(),
1383                ),
1384            ))
1385        }
1386    }
1387
1388    /// Check system health
1389    pub async fn health_check(&self) -> Result<()> {
1390        if let Some(ref resource_manager) = self.resource_manager {
1391            resource_manager.health_check().await
1392        } else {
1393            // Basic health check without resource manager
1394            let peer_count = self.peer_count().await;
1395            if peer_count > self.config.max_connections {
1396                Err(P2PError::Network(
1397                    crate::error::NetworkError::ProtocolError(
1398                        format!("Too many connections: {peer_count}").into(),
1399                    ),
1400                ))
1401            } else {
1402                Ok(())
1403            }
1404        }
1405    }
1406
1407    /// Get production configuration (if enabled)
1408    pub fn production_config(&self) -> Option<&ProductionConfig> {
1409        self.config.production_config.as_ref()
1410    }
1411
1412    /// Check if production hardening is enabled
1413    pub fn is_production_mode(&self) -> bool {
1414        self.resource_manager.is_some()
1415    }
1416
1417    /// Get DHT reference
1418    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1419        self.dht.as_ref()
1420    }
1421
1422    /// Store a value in the DHT
1423    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1424        if let Some(ref dht) = self.dht {
1425            let mut dht_instance = dht.write().await;
1426            let dht_key = crate::dht::DhtKey::from_bytes(key);
1427            dht_instance
1428                .store(&dht_key, value.clone())
1429                .await
1430                .map_err(|e| {
1431                    P2PError::Dht(crate::error::DhtError::StoreFailed(
1432                        format!("{:?}: {e}", key).into(),
1433                    ))
1434                })?;
1435
1436            Ok(())
1437        } else {
1438            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1439                "DHT not enabled".to_string().into(),
1440            )))
1441        }
1442    }
1443
1444    /// Retrieve a value from the DHT
1445    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1446        if let Some(ref dht) = self.dht {
1447            let dht_instance = dht.read().await;
1448            let dht_key = crate::dht::DhtKey::from_bytes(key);
1449            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1450                P2PError::Dht(crate::error::DhtError::StoreFailed(
1451                    format!("Retrieve failed: {e}").into(),
1452                ))
1453            })?;
1454
1455            Ok(record_result)
1456        } else {
1457            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1458                "DHT not enabled".to_string().into(),
1459            )))
1460        }
1461    }
1462
1463    /// Add a discovered peer to the bootstrap cache
1464    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1465        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1466            let mut manager = bootstrap_manager.write().await;
1467            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1468                .iter()
1469                .filter_map(|addr| addr.parse().ok())
1470                .collect();
1471            let contact = ContactEntry::new(peer_id, socket_addresses);
1472            manager.add_contact(contact).await.map_err(|e| {
1473                P2PError::Network(crate::error::NetworkError::ProtocolError(
1474                    format!("Failed to add peer to bootstrap cache: {e}").into(),
1475                ))
1476            })?;
1477        }
1478        Ok(())
1479    }
1480
1481    /// Update connection metrics for a peer in the bootstrap cache
1482    pub async fn update_peer_metrics(
1483        &self,
1484        peer_id: &PeerId,
1485        success: bool,
1486        latency_ms: Option<u64>,
1487        _error: Option<String>,
1488    ) -> Result<()> {
1489        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1490            let mut manager = bootstrap_manager.write().await;
1491
1492            // Create quality metrics based on the connection result
1493            let metrics = QualityMetrics {
1494                success_rate: if success { 1.0 } else { 0.0 },
1495                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1496                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
1497                last_connection_attempt: chrono::Utc::now(),
1498                last_successful_connection: if success {
1499                    chrono::Utc::now()
1500                } else {
1501                    chrono::Utc::now() - chrono::Duration::hours(1)
1502                },
1503                uptime_score: 0.5,
1504            };
1505
1506            manager
1507                .update_contact_metrics(peer_id, metrics)
1508                .await
1509                .map_err(|e| {
1510                    P2PError::Network(crate::error::NetworkError::ProtocolError(
1511                        format!("Failed to update peer metrics: {e}").into(),
1512                    ))
1513                })?;
1514        }
1515        Ok(())
1516    }
1517
1518    /// Get bootstrap cache statistics
1519    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1520        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1521            let manager = bootstrap_manager.read().await;
1522            let stats = manager.get_stats().await.map_err(|e| {
1523                P2PError::Network(crate::error::NetworkError::ProtocolError(
1524                    format!("Failed to get bootstrap stats: {e}").into(),
1525                ))
1526            })?;
1527            Ok(Some(stats))
1528        } else {
1529            Ok(None)
1530        }
1531    }
1532
1533    /// Get the number of cached bootstrap peers
1534    pub async fn cached_peer_count(&self) -> usize {
1535        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1536            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1537        {
1538            return stats.total_contacts;
1539        }
1540        0
1541    }
1542
1543    /// Connect to bootstrap peers
1544    async fn connect_bootstrap_peers(&self) -> Result<()> {
1545        let mut bootstrap_contacts = Vec::new();
1546        let mut used_cache = false;
1547
1548        // Try to get peers from bootstrap cache first
1549        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1550            let manager = bootstrap_manager.read().await;
1551            match manager.get_bootstrap_peers(20).await {
1552                // Try to get top 20 quality peers
1553                Ok(contacts) => {
1554                    if !contacts.is_empty() {
1555                        info!("Using {} cached bootstrap peers", contacts.len());
1556                        bootstrap_contacts = contacts;
1557                        used_cache = true;
1558                    }
1559                }
1560                Err(e) => {
1561                    warn!("Failed to get cached bootstrap peers: {}", e);
1562                }
1563            }
1564        }
1565
1566        // Fallback to configured bootstrap peers if no cache or cache is empty
1567        if bootstrap_contacts.is_empty() {
1568            let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1569                &self.config.bootstrap_peers_str
1570            } else {
1571                // Convert Multiaddr to strings for fallback
1572                &self
1573                    .config
1574                    .bootstrap_peers
1575                    .iter()
1576                    .map(|addr| addr.to_string())
1577                    .collect::<Vec<_>>()
1578            };
1579
1580            if bootstrap_peers.is_empty() {
1581                info!("No bootstrap peers configured and no cached peers available");
1582                return Ok(());
1583            }
1584
1585            info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1586
1587            for addr in bootstrap_peers {
1588                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1589                    let contact = ContactEntry::new(
1590                        format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1591                        vec![socket_addr],
1592                    );
1593                    bootstrap_contacts.push(contact);
1594                } else {
1595                    warn!("Invalid bootstrap address format: {}", addr);
1596                }
1597            }
1598        }
1599
1600        // Connect to bootstrap peers
1601        let mut successful_connections = 0;
1602        for contact in bootstrap_contacts {
1603            for addr in &contact.addresses {
1604                match self.connect_peer(&addr.to_string()).await {
1605                    Ok(peer_id) => {
1606                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1607                        successful_connections += 1;
1608
1609                        // Update bootstrap cache with successful connection
1610                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1611                            let mut manager = bootstrap_manager.write().await;
1612                            let mut updated_contact = contact.clone();
1613                            updated_contact.peer_id = peer_id.clone();
1614                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
1615
1616                            if let Err(e) = manager.add_contact(updated_contact).await {
1617                                warn!("Failed to update bootstrap cache: {}", e);
1618                            }
1619                        }
1620                        break; // Successfully connected, move to next contact
1621                    }
1622                    Err(e) => {
1623                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1624
1625                        // Update bootstrap cache with failed connection
1626                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1627                            let mut manager = bootstrap_manager.write().await;
1628                            let mut updated_contact = contact.clone();
1629                            updated_contact.update_connection_result(
1630                                false,
1631                                None,
1632                                Some(e.to_string()),
1633                            );
1634
1635                            if let Err(e) = manager.add_contact(updated_contact).await {
1636                                warn!("Failed to update bootstrap cache: {}", e);
1637                            }
1638                        }
1639                    }
1640                }
1641            }
1642        }
1643
1644        if successful_connections == 0 {
1645            if !used_cache {
1646                warn!("Failed to connect to any bootstrap peers");
1647            }
1648            return Err(P2PError::Network(NetworkError::ConnectionFailed {
1649                addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), // Placeholder for bootstrap ensemble
1650                reason: "Failed to connect to any bootstrap peers".into(),
1651            }));
1652        }
1653        info!(
1654            "Successfully connected to {} bootstrap peers",
1655            successful_connections
1656        );
1657
1658        Ok(())
1659    }
1660
1661    /// Disconnect from all peers
1662    async fn disconnect_all_peers(&self) -> Result<()> {
1663        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
1664
1665        for peer_id in peer_ids {
1666            self.disconnect_peer(&peer_id).await?;
1667        }
1668
1669        Ok(())
1670    }
1671
1672    /// Perform periodic maintenance tasks
1673    async fn periodic_tasks(&self) -> Result<()> {
1674        // Update peer last seen timestamps
1675        // Remove stale connections
1676        // Perform DHT maintenance
1677        // This is a placeholder for now
1678
1679        Ok(())
1680    }
1681}
1682
1683/// Network sender trait for sending messages
1684#[async_trait::async_trait]
1685pub trait NetworkSender: Send + Sync {
1686    /// Send a message to a specific peer
1687    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1688
1689    /// Get our local peer ID
1690    fn local_peer_id(&self) -> &PeerId;
1691}
1692
1693/// Lightweight wrapper for P2PNode to implement NetworkSender
1694#[derive(Clone)]
1695pub struct P2PNetworkSender {
1696    peer_id: PeerId,
1697    // Use channels for async communication with the P2P node
1698    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1699}
1700
1701impl P2PNetworkSender {
1702    pub fn new(
1703        peer_id: PeerId,
1704        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1705    ) -> Self {
1706        Self { peer_id, send_tx }
1707    }
1708}
1709
1710/// Implementation of NetworkSender trait for P2PNetworkSender
1711#[async_trait::async_trait]
1712impl NetworkSender for P2PNetworkSender {
1713    /// Send a message to a specific peer via the P2P network
1714    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
1715        self.send_tx
1716            .send((peer_id.clone(), protocol.to_string(), data))
1717            .map_err(|_| {
1718                P2PError::Network(crate::error::NetworkError::ProtocolError(
1719                    "Failed to send message via channel".to_string().into(),
1720                ))
1721            })?;
1722        Ok(())
1723    }
1724
1725    /// Get our local peer ID
1726    fn local_peer_id(&self) -> &PeerId {
1727        &self.peer_id
1728    }
1729}
1730
1731/// Builder pattern for creating P2P nodes
1732pub struct NodeBuilder {
1733    config: NodeConfig,
1734}
1735
1736impl Default for NodeBuilder {
1737    fn default() -> Self {
1738        Self::new()
1739    }
1740}
1741
1742impl NodeBuilder {
1743    /// Create a new node builder
1744    pub fn new() -> Self {
1745        Self {
1746            config: NodeConfig::default(),
1747        }
1748    }
1749
1750    /// Set the peer ID
1751    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1752        self.config.peer_id = Some(peer_id);
1753        self
1754    }
1755
1756    /// Add a listen address
1757    pub fn listen_on(mut self, addr: &str) -> Self {
1758        if let Ok(multiaddr) = addr.parse() {
1759            self.config.listen_addrs.push(multiaddr);
1760        }
1761        self
1762    }
1763
1764    /// Add a bootstrap peer
1765    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1766        if let Ok(multiaddr) = addr.parse() {
1767            self.config.bootstrap_peers.push(multiaddr);
1768        }
1769        self.config.bootstrap_peers_str.push(addr.to_string());
1770        self
1771    }
1772
1773    /// Enable IPv6 support
1774    pub fn with_ipv6(mut self, enable: bool) -> Self {
1775        self.config.enable_ipv6 = enable;
1776        self
1777    }
1778
1779    // MCP removed: builder methods deleted
1780
1781    /// Set connection timeout
1782    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1783        self.config.connection_timeout = timeout;
1784        self
1785    }
1786
1787    /// Set maximum connections
1788    pub fn with_max_connections(mut self, max: usize) -> Self {
1789        self.config.max_connections = max;
1790        self
1791    }
1792
1793    /// Enable production mode with default configuration
1794    pub fn with_production_mode(mut self) -> Self {
1795        self.config.production_config = Some(ProductionConfig::default());
1796        self
1797    }
1798
1799    /// Configure production settings
1800    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1801        self.config.production_config = Some(production_config);
1802        self
1803    }
1804
1805    /// Configure DHT settings
1806    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
1807        self.config.dht_config = dht_config;
1808        self
1809    }
1810
1811    /// Enable DHT with default configuration
1812    pub fn with_default_dht(mut self) -> Self {
1813        self.config.dht_config = DHTConfig::default();
1814        self
1815    }
1816
1817    /// Build the P2P node
1818    pub async fn build(self) -> Result<P2PNode> {
1819        P2PNode::new(self.config).await
1820    }
1821}
1822
1823/// Standalone function to handle received messages without borrowing self
1824#[allow(dead_code)] // Deprecated during ant-quic migration
1825async fn handle_received_message_standalone(
1826    message_data: Vec<u8>,
1827    peer_id: &PeerId,
1828    _protocol: &str,
1829    event_tx: &broadcast::Sender<P2PEvent>,
1830) -> Result<()> {
1831    // Parse the message format
1832    match serde_json::from_slice::<serde_json::Value>(&message_data) {
1833        Ok(message) => {
1834            if let (Some(protocol), Some(data), Some(from)) = (
1835                message.get("protocol").and_then(|v| v.as_str()),
1836                message.get("data").and_then(|v| v.as_array()),
1837                message.get("from").and_then(|v| v.as_str()),
1838            ) {
1839                // Convert data array back to bytes
1840                let data_bytes: Vec<u8> = data
1841                    .iter()
1842                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1843                    .collect();
1844
1845                // Generate message event
1846                let event = P2PEvent::Message {
1847                    topic: protocol.to_string(),
1848                    source: from.to_string(),
1849                    data: data_bytes,
1850                };
1851
1852                let _ = event_tx.send(event);
1853                debug!("Generated message event from peer: {}", peer_id);
1854            }
1855        }
1856        Err(e) => {
1857            warn!("Failed to parse received message from {}: {}", peer_id, e);
1858        }
1859    }
1860
1861    Ok(())
1862}
1863
1864// MCP removed: standalone MCP handler deleted
1865
1866/// Helper function to handle protocol message creation
1867#[allow(dead_code)]
1868fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
1869    match create_protocol_message_static(protocol, data) {
1870        Ok(msg) => Some(msg),
1871        Err(e) => {
1872            warn!("Failed to create protocol message: {}", e);
1873            None
1874        }
1875    }
1876}
1877
1878/// Helper function to handle message send result
1879#[allow(dead_code)]
1880async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
1881    match result {
1882        Ok(_) => {
1883            debug!("Message sent to peer {} via transport layer", peer_id);
1884        }
1885        Err(e) => {
1886            warn!("Failed to send message to peer {}: {}", peer_id, e);
1887        }
1888    }
1889}
1890
1891/// Helper function to check rate limit
1892#[allow(dead_code)] // Deprecated during ant-quic migration
1893fn check_rate_limit(
1894    rate_limiter: &RateLimiter,
1895    socket_addr: &std::net::SocketAddr,
1896    remote_addr: &NetworkAddress,
1897) -> Result<()> {
1898    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
1899        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
1900        e
1901    })
1902}
1903
1904/// Helper function to register a new peer
1905#[allow(dead_code)] // Deprecated during ant-quic migration
1906async fn register_new_peer(
1907    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1908    peer_id: &PeerId,
1909    remote_addr: &NetworkAddress,
1910) {
1911    let mut peers_guard = peers.write().await;
1912    let peer_info = PeerInfo {
1913        peer_id: peer_id.clone(),
1914        addresses: vec![remote_addr.to_string()],
1915        connected_at: tokio::time::Instant::now(),
1916        last_seen: tokio::time::Instant::now(),
1917        status: ConnectionStatus::Connected,
1918        protocols: vec!["p2p-chat/1.0.0".to_string()],
1919        heartbeat_count: 0,
1920    };
1921    peers_guard.insert(peer_id.clone(), peer_info);
1922}
1923
1924/// Helper function to spawn connection handler
1925#[allow(dead_code)] // Deprecated during ant-quic migration
1926fn spawn_connection_handler(
1927    connection: Box<dyn crate::transport::Connection>,
1928    peer_id: PeerId,
1929    event_tx: broadcast::Sender<P2PEvent>,
1930    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1931) {
1932    tokio::spawn(async move {
1933        handle_peer_connection(connection, peer_id, event_tx, peers).await;
1934    });
1935}
1936
1937/// Helper function to handle peer connection
1938#[allow(dead_code)] // Deprecated during ant-quic migration
1939async fn handle_peer_connection(
1940    mut connection: Box<dyn crate::transport::Connection>,
1941    peer_id: PeerId,
1942    event_tx: broadcast::Sender<P2PEvent>,
1943    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1944) {
1945    loop {
1946        match connection.receive().await {
1947            Ok(message_data) => {
1948                debug!(
1949                    "Received {} bytes from peer: {}",
1950                    message_data.len(),
1951                    peer_id
1952                );
1953
1954                // Handle the received message
1955                if let Err(e) = handle_received_message_standalone(
1956                    message_data,
1957                    &peer_id,
1958                    "unknown", // TODO: Extract protocol from message
1959                    &event_tx,
1960                )
1961                .await
1962                {
1963                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
1964                }
1965            }
1966            Err(e) => {
1967                warn!("Failed to receive message from {}: {}", peer_id, e);
1968
1969                // Check if connection is still alive
1970                if !connection.is_alive().await {
1971                    info!("Connection to {} is dead, removing peer", peer_id);
1972
1973                    // Remove dead peer
1974                    remove_peer(&peers, &peer_id).await;
1975
1976                    // Generate peer disconnected event
1977                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
1978
1979                    break; // Exit the message receiving loop
1980                }
1981
1982                // Brief pause before retrying
1983                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1984            }
1985        }
1986    }
1987}
1988
1989/// Helper function to remove a peer
1990#[allow(dead_code)] // Deprecated during ant-quic migration
1991async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
1992    let mut peers_guard = peers.write().await;
1993    peers_guard.remove(peer_id);
1994}
1995
1996/// Helper function to update peer heartbeat
1997#[allow(dead_code)]
1998async fn update_peer_heartbeat(
1999    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2000    peer_id: &PeerId,
2001) -> Result<()> {
2002    let mut peers_guard = peers.write().await;
2003    match peers_guard.get_mut(peer_id) {
2004        Some(peer_info) => {
2005            peer_info.last_seen = Instant::now();
2006            peer_info.heartbeat_count += 1;
2007            Ok(())
2008        }
2009        None => {
2010            warn!("Received heartbeat from unknown peer: {}", peer_id);
2011            Err(P2PError::Network(NetworkError::PeerNotFound(
2012                format!("Peer {} not found", peer_id).into(),
2013            )))
2014        }
2015    }
2016}
2017
2018/// Helper function to get resource metrics
2019#[allow(dead_code)]
2020async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2021    if let Some(manager) = resource_manager {
2022        let metrics = manager.get_metrics().await;
2023        (metrics.memory_used, metrics.cpu_usage)
2024    } else {
2025        (0, 0.0)
2026    }
2027}
2028
2029#[cfg(test)]
2030mod tests {
2031    use super::*;
2032    // MCP removed from tests
2033    use std::time::Duration;
2034    use tokio::time::timeout;
2035
2036    // Test tool handler for network tests
2037
2038    // MCP removed
2039
2040    /// Helper function to create a test node configuration
2041    fn create_test_node_config() -> NodeConfig {
2042        NodeConfig {
2043            peer_id: Some("test_peer_123".to_string()),
2044            listen_addrs: vec![
2045                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2046                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2047            ],
2048            listen_addr: std::net::SocketAddr::new(
2049                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2050                0,
2051            ),
2052            bootstrap_peers: vec![],
2053            bootstrap_peers_str: vec![],
2054            enable_ipv6: true,
2055
2056            connection_timeout: Duration::from_secs(10),
2057            keep_alive_interval: Duration::from_secs(30),
2058            max_connections: 100,
2059            max_incoming_connections: 50,
2060            dht_config: DHTConfig::default(),
2061            security_config: SecurityConfig::default(),
2062            production_config: None,
2063            bootstrap_cache_config: None,
2064            // identity_config: None,
2065        }
2066    }
2067
2068    /// Helper function to create a test tool
2069    // MCP removed: test tool helper deleted
2070
2071    #[tokio::test]
2072    async fn test_node_config_default() {
2073        let config = NodeConfig::default();
2074
2075        assert!(config.peer_id.is_none());
2076        assert_eq!(config.listen_addrs.len(), 2);
2077        assert!(config.enable_ipv6);
2078        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2079        assert_eq!(config.max_incoming_connections, 100);
2080        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2081    }
2082
2083    #[tokio::test]
2084    async fn test_dht_config_default() {
2085        let config = DHTConfig::default();
2086
2087        assert_eq!(config.k_value, 20);
2088        assert_eq!(config.alpha_value, 5);
2089        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2090        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2091    }
2092
2093    #[tokio::test]
2094    async fn test_security_config_default() {
2095        let config = SecurityConfig::default();
2096
2097        assert!(config.enable_noise);
2098        assert!(config.enable_tls);
2099        assert_eq!(config.trust_level, TrustLevel::Basic);
2100    }
2101
2102    #[test]
2103    fn test_trust_level_variants() {
2104        // Test that all trust level variants can be created
2105        let _none = TrustLevel::None;
2106        let _basic = TrustLevel::Basic;
2107        let _full = TrustLevel::Full;
2108
2109        // Test equality
2110        assert_eq!(TrustLevel::None, TrustLevel::None);
2111        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2112        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2113        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2114    }
2115
2116    #[test]
2117    fn test_connection_status_variants() {
2118        let connecting = ConnectionStatus::Connecting;
2119        let connected = ConnectionStatus::Connected;
2120        let disconnecting = ConnectionStatus::Disconnecting;
2121        let disconnected = ConnectionStatus::Disconnected;
2122        let failed = ConnectionStatus::Failed("test error".to_string());
2123
2124        assert_eq!(connecting, ConnectionStatus::Connecting);
2125        assert_eq!(connected, ConnectionStatus::Connected);
2126        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2127        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2128        assert_ne!(connecting, connected);
2129
2130        if let ConnectionStatus::Failed(msg) = failed {
2131            assert_eq!(msg, "test error");
2132        } else {
2133            panic!("Expected Failed status");
2134        }
2135    }
2136
2137    #[tokio::test]
2138    async fn test_node_creation() -> Result<()> {
2139        let config = create_test_node_config();
2140        let node = P2PNode::new(config).await?;
2141
2142        assert_eq!(node.peer_id(), "test_peer_123");
2143        assert!(!node.is_running().await);
2144        assert_eq!(node.peer_count().await, 0);
2145        assert!(node.connected_peers().await.is_empty());
2146
2147        Ok(())
2148    }
2149
2150    #[tokio::test]
2151    async fn test_node_creation_without_peer_id() -> Result<()> {
2152        let mut config = create_test_node_config();
2153        config.peer_id = None;
2154
2155        let node = P2PNode::new(config).await?;
2156
2157        // Should have generated a peer ID
2158        assert!(node.peer_id().starts_with("peer_"));
2159        assert!(!node.is_running().await);
2160
2161        Ok(())
2162    }
2163
2164    #[tokio::test]
2165    async fn test_node_lifecycle() -> Result<()> {
2166        let config = create_test_node_config();
2167        let node = P2PNode::new(config).await?;
2168
2169        // Initially not running
2170        assert!(!node.is_running().await);
2171
2172        // Start the node
2173        node.start().await?;
2174        assert!(node.is_running().await);
2175
2176        // Check listen addresses were set (at least one)
2177        let listen_addrs = node.listen_addrs().await;
2178        assert!(
2179            !listen_addrs.is_empty(),
2180            "Expected at least one listening address"
2181        );
2182
2183        // Stop the node
2184        node.stop().await?;
2185        assert!(!node.is_running().await);
2186
2187        Ok(())
2188    }
2189
2190    #[tokio::test]
2191    async fn test_peer_connection() -> Result<()> {
2192        let config = create_test_node_config();
2193        let node = P2PNode::new(config).await?;
2194
2195        let peer_addr = "127.0.0.1:0";
2196
2197        // Connect to a peer
2198        let peer_id = node.connect_peer(peer_addr).await?;
2199        assert!(peer_id.starts_with("peer_from_"));
2200
2201        // Check peer count
2202        assert_eq!(node.peer_count().await, 1);
2203
2204        // Check connected peers
2205        let connected_peers = node.connected_peers().await;
2206        assert_eq!(connected_peers.len(), 1);
2207        assert_eq!(connected_peers[0], peer_id);
2208
2209        // Get peer info
2210        let peer_info = node.peer_info(&peer_id).await;
2211        assert!(peer_info.is_some());
2212        let info = peer_info.expect("Peer info should exist after adding peer");
2213        assert_eq!(info.peer_id, peer_id);
2214        assert_eq!(info.status, ConnectionStatus::Connected);
2215        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2216
2217        // Disconnect from peer
2218        node.disconnect_peer(&peer_id).await?;
2219        assert_eq!(node.peer_count().await, 0);
2220
2221        Ok(())
2222    }
2223
2224    #[tokio::test]
2225    async fn test_event_subscription() -> Result<()> {
2226        let config = create_test_node_config();
2227        let node = P2PNode::new(config).await?;
2228
2229        let mut events = node.subscribe_events();
2230        let peer_addr = "127.0.0.1:0";
2231
2232        // Connect to a peer (this should emit an event)
2233        let peer_id = node.connect_peer(peer_addr).await?;
2234
2235        // Check for PeerConnected event
2236        let event = timeout(Duration::from_millis(100), events.recv()).await;
2237        assert!(event.is_ok());
2238
2239        let event_result = event
2240            .expect("Should receive event")
2241            .expect("Event should not be error");
2242        match event_result {
2243            P2PEvent::PeerConnected(event_peer_id) => {
2244                assert_eq!(event_peer_id, peer_id);
2245            }
2246            _ => panic!("Expected PeerConnected event"),
2247        }
2248
2249        // Disconnect from peer (this should emit another event)
2250        node.disconnect_peer(&peer_id).await?;
2251
2252        // Check for PeerDisconnected event
2253        let event = timeout(Duration::from_millis(100), events.recv()).await;
2254        assert!(event.is_ok());
2255
2256        let event_result = event
2257            .expect("Should receive event")
2258            .expect("Event should not be error");
2259        match event_result {
2260            P2PEvent::PeerDisconnected(event_peer_id) => {
2261                assert_eq!(event_peer_id, peer_id);
2262            }
2263            _ => panic!("Expected PeerDisconnected event"),
2264        }
2265
2266        Ok(())
2267    }
2268
2269    #[tokio::test]
2270    async fn test_message_sending() -> Result<()> {
2271        // Create two nodes
2272        let mut config1 = create_test_node_config();
2273        config1.listen_addr =
2274            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2275        let node1 = P2PNode::new(config1).await?;
2276        node1.start().await?;
2277
2278        let mut config2 = create_test_node_config();
2279        config2.listen_addr =
2280            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2281        let node2 = P2PNode::new(config2).await?;
2282        node2.start().await?;
2283
2284        // Wait a bit for nodes to start listening
2285        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2286
2287        // Get actual listening address of node2
2288        let node2_addr = node2.local_addr().ok_or_else(|| {
2289            P2PError::Network(crate::error::NetworkError::ProtocolError(
2290                "No listening address".to_string().into(),
2291            ))
2292        })?;
2293
2294        // Connect node1 to node2
2295        let peer_id = node1.connect_peer(&node2_addr).await?;
2296
2297        // Wait a bit for connection to establish
2298        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2299
2300        // Send a message
2301        let message_data = b"Hello, peer!".to_vec();
2302        let result = node1
2303            .send_message(&peer_id, "test-protocol", message_data)
2304            .await;
2305        // For now, we'll just check that we don't get a "not connected" error
2306        // The actual send might fail due to no handler on the other side
2307        if let Err(e) = &result {
2308            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2309        }
2310
2311        // Try to send to non-existent peer
2312        let non_existent_peer = "non_existent_peer".to_string();
2313        let result = node1
2314            .send_message(&non_existent_peer, "test-protocol", vec![])
2315            .await;
2316        assert!(result.is_err(), "Sending to non-existent peer should fail");
2317
2318        Ok(())
2319    }
2320
2321    #[tokio::test]
2322    async fn test_remote_mcp_operations() -> Result<()> {
2323        let config = create_test_node_config();
2324        let node = P2PNode::new(config).await?;
2325
2326        // MCP removed; test reduced to simple start/stop
2327        node.start().await?;
2328        node.stop().await?;
2329        Ok(())
2330    }
2331
2332    #[tokio::test]
2333    async fn test_health_check() -> Result<()> {
2334        let config = create_test_node_config();
2335        let node = P2PNode::new(config).await?;
2336
2337        // Health check should pass with no connections
2338        let result = node.health_check().await;
2339        assert!(result.is_ok());
2340
2341        // Note: We're not actually connecting to real peers here
2342        // since that would require running bootstrap nodes.
2343        // The health check should still pass with no connections.
2344
2345        Ok(())
2346    }
2347
2348    #[tokio::test]
2349    async fn test_node_uptime() -> Result<()> {
2350        let config = create_test_node_config();
2351        let node = P2PNode::new(config).await?;
2352
2353        let uptime1 = node.uptime();
2354        assert!(uptime1 >= Duration::from_secs(0));
2355
2356        // Wait a bit
2357        tokio::time::sleep(Duration::from_millis(10)).await;
2358
2359        let uptime2 = node.uptime();
2360        assert!(uptime2 > uptime1);
2361
2362        Ok(())
2363    }
2364
2365    #[tokio::test]
2366    async fn test_node_config_access() -> Result<()> {
2367        let config = create_test_node_config();
2368        let expected_peer_id = config.peer_id.clone();
2369        let node = P2PNode::new(config).await?;
2370
2371        let node_config = node.config();
2372        assert_eq!(node_config.peer_id, expected_peer_id);
2373        assert_eq!(node_config.max_connections, 100);
2374        // MCP removed
2375
2376        Ok(())
2377    }
2378
2379    #[tokio::test]
2380    async fn test_mcp_server_access() -> Result<()> {
2381        let config = create_test_node_config();
2382        let _node = P2PNode::new(config).await?;
2383
2384        // MCP removed
2385        Ok(())
2386    }
2387
2388    #[tokio::test]
2389    async fn test_dht_access() -> Result<()> {
2390        let config = create_test_node_config();
2391        let node = P2PNode::new(config).await?;
2392
2393        // Should have DHT
2394        assert!(node.dht().is_some());
2395
2396        Ok(())
2397    }
2398
2399    #[tokio::test]
2400    async fn test_node_builder() -> Result<()> {
2401        // Create a config using the builder but don't actually build a real node
2402        let builder = P2PNode::builder()
2403            .with_peer_id("builder_test_peer".to_string())
2404            .listen_on("/ip4/127.0.0.1/tcp/0")
2405            .listen_on("/ip6/::1/tcp/0")
2406            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
2407            .with_ipv6(true)
2408            .with_connection_timeout(Duration::from_secs(15))
2409            .with_max_connections(200);
2410
2411        // Test the configuration that was built
2412        let config = builder.config;
2413        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2414        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
2415        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
2416        assert!(config.enable_ipv6);
2417        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2418        assert_eq!(config.max_connections, 200);
2419
2420        Ok(())
2421    }
2422
2423    #[tokio::test]
2424    async fn test_bootstrap_peers() -> Result<()> {
2425        let mut config = create_test_node_config();
2426        config.bootstrap_peers = vec![
2427            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2428            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2429        ];
2430
2431        let node = P2PNode::new(config).await?;
2432
2433        // Start node (which attempts to connect to bootstrap peers)
2434        node.start().await?;
2435
2436        // In a test environment, bootstrap peers may not be available
2437        // The test verifies the node starts correctly with bootstrap configuration
2438        let peer_count = node.peer_count().await;
2439        assert!(
2440            peer_count <= 2,
2441            "Peer count should not exceed bootstrap peer count"
2442        );
2443
2444        node.stop().await?;
2445        Ok(())
2446    }
2447
2448    #[tokio::test]
2449    async fn test_production_mode_disabled() -> Result<()> {
2450        let config = create_test_node_config();
2451        let node = P2PNode::new(config).await?;
2452
2453        assert!(!node.is_production_mode());
2454        assert!(node.production_config().is_none());
2455
2456        // Resource metrics should fail when production mode is disabled
2457        let result = node.resource_metrics().await;
2458        assert!(result.is_err());
2459        assert!(result.unwrap_err().to_string().contains("not enabled"));
2460
2461        Ok(())
2462    }
2463
2464    #[tokio::test]
2465    async fn test_network_event_variants() {
2466        // Test that all network event variants can be created
2467        let peer_id = "test_peer".to_string();
2468        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2469
2470        let _peer_connected = NetworkEvent::PeerConnected {
2471            peer_id: peer_id.clone(),
2472            addresses: vec![address.clone()],
2473        };
2474
2475        let _peer_disconnected = NetworkEvent::PeerDisconnected {
2476            peer_id: peer_id.clone(),
2477            reason: "test disconnect".to_string(),
2478        };
2479
2480        let _message_received = NetworkEvent::MessageReceived {
2481            peer_id: peer_id.clone(),
2482            protocol: "test-protocol".to_string(),
2483            data: vec![1, 2, 3],
2484        };
2485
2486        let _connection_failed = NetworkEvent::ConnectionFailed {
2487            peer_id: Some(peer_id.clone()),
2488            address: address.clone(),
2489            error: "connection refused".to_string(),
2490        };
2491
2492        let _dht_stored = NetworkEvent::DHTRecordStored {
2493            key: vec![1, 2, 3],
2494            value: vec![4, 5, 6],
2495        };
2496
2497        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2498            key: vec![1, 2, 3],
2499            value: Some(vec![4, 5, 6]),
2500        };
2501    }
2502
2503    #[tokio::test]
2504    async fn test_peer_info_structure() {
2505        let peer_info = PeerInfo {
2506            peer_id: "test_peer".to_string(),
2507            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2508            connected_at: Instant::now(),
2509            last_seen: Instant::now(),
2510            status: ConnectionStatus::Connected,
2511            protocols: vec!["test-protocol".to_string()],
2512            heartbeat_count: 0,
2513        };
2514
2515        assert_eq!(peer_info.peer_id, "test_peer");
2516        assert_eq!(peer_info.addresses.len(), 1);
2517        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2518        assert_eq!(peer_info.protocols.len(), 1);
2519    }
2520
2521    #[tokio::test]
2522    async fn test_serialization() -> Result<()> {
2523        // Test that configs can be serialized/deserialized
2524        let config = create_test_node_config();
2525        let serialized = serde_json::to_string(&config)?;
2526        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2527
2528        assert_eq!(config.peer_id, deserialized.peer_id);
2529        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2530        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2531
2532        Ok(())
2533    }
2534}