saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] // Temporarily unused during migration
27use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::sync::{RwLock, broadcast};
37use tokio::time::Instant;
38use tracing::{debug, info, warn};
39
40/// Configuration for a P2P node
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct NodeConfig {
43    /// Local peer ID for this node
44    pub peer_id: Option<PeerId>,
45
46    /// Addresses to listen on for incoming connections
47    pub listen_addrs: Vec<std::net::SocketAddr>,
48
49    /// Primary listen address (for compatibility)
50    pub listen_addr: std::net::SocketAddr,
51
52    /// Bootstrap peers to connect to on startup (legacy)
53    pub bootstrap_peers: Vec<std::net::SocketAddr>,
54
55    /// Bootstrap peers as strings (for integration tests)
56    pub bootstrap_peers_str: Vec<String>,
57
58    /// Enable IPv6 support
59    pub enable_ipv6: bool,
60
61    // MCP removed; will be redesigned later
62    /// Connection timeout duration
63    pub connection_timeout: Duration,
64
65    /// Keep-alive interval for connections
66    pub keep_alive_interval: Duration,
67
68    /// Maximum number of concurrent connections
69    pub max_connections: usize,
70
71    /// Maximum number of incoming connections
72    pub max_incoming_connections: usize,
73
74    /// DHT configuration
75    pub dht_config: DHTConfig,
76
77    /// Security configuration
78    pub security_config: SecurityConfig,
79
80    /// Production hardening configuration
81    pub production_config: Option<ProductionConfig>,
82
83    /// Bootstrap cache configuration
84    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
85}
86
87/// DHT-specific configuration
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DHTConfig {
90    /// Kademlia K parameter (bucket size)
91    pub k_value: usize,
92
93    /// Kademlia alpha parameter (parallelism)
94    pub alpha_value: usize,
95
96    /// DHT record TTL
97    pub record_ttl: Duration,
98
99    /// DHT refresh interval
100    pub refresh_interval: Duration,
101}
102
103/// Security configuration
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SecurityConfig {
106    /// Enable noise protocol for encryption
107    pub enable_noise: bool,
108
109    /// Enable TLS for secure transport
110    pub enable_tls: bool,
111
112    /// Trust level for peer verification
113    pub trust_level: TrustLevel,
114}
115
116/// Trust level for peer verification
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub enum TrustLevel {
119    /// No verification required
120    None,
121    /// Basic peer ID verification
122    Basic,
123    /// Full cryptographic verification
124    Full,
125}
126
127impl NodeConfig {
128    /// Create a new NodeConfig with default values
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if default addresses cannot be parsed
133    pub fn new() -> Result<Self> {
134        // Load config and use its defaults
135        let config = Config::default();
136
137        // Parse the default listen address
138        let listen_addr = config.listen_socket_addr()?;
139
140        // Create listen addresses based on config
141        let mut listen_addrs = vec![];
142
143        // Add IPv6 address if enabled
144        if config.network.ipv6_enabled {
145            let ipv6_addr = std::net::SocketAddr::new(
146                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
147                listen_addr.port(),
148            );
149            listen_addrs.push(ipv6_addr);
150        }
151
152        // Always add IPv4
153        let ipv4_addr = std::net::SocketAddr::new(
154            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
155            listen_addr.port(),
156        );
157        listen_addrs.push(ipv4_addr);
158
159        Ok(Self {
160            peer_id: None,
161            listen_addrs,
162            listen_addr,
163            bootstrap_peers: Vec::new(),
164            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
165            enable_ipv6: config.network.ipv6_enabled,
166
167            connection_timeout: Duration::from_secs(config.network.connection_timeout),
168            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
169            max_connections: config.network.max_connections,
170            max_incoming_connections: config.security.connection_limit as usize,
171            dht_config: DHTConfig::default(),
172            security_config: SecurityConfig::default(),
173            production_config: None,
174            bootstrap_cache_config: None,
175            // identity_config: None,
176        })
177    }
178}
179
180impl Default for NodeConfig {
181    fn default() -> Self {
182        // Use config defaults for network settings
183        let config = Config::default();
184
185        // Parse the default listen address - use safe fallback if parsing fails
186        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
187            std::net::SocketAddr::new(
188                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
189                9000,
190            )
191        });
192
193        Self {
194            peer_id: None,
195            listen_addrs: vec![
196                std::net::SocketAddr::new(
197                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
198                    listen_addr.port(),
199                ),
200                std::net::SocketAddr::new(
201                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
202                    listen_addr.port(),
203                ),
204            ],
205            listen_addr,
206            bootstrap_peers: Vec::new(),
207            bootstrap_peers_str: Vec::new(),
208            enable_ipv6: config.network.ipv6_enabled,
209
210            connection_timeout: Duration::from_secs(config.network.connection_timeout),
211            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
212            max_connections: config.network.max_connections,
213            max_incoming_connections: config.security.connection_limit as usize,
214            dht_config: DHTConfig::default(),
215            security_config: SecurityConfig::default(),
216            production_config: None, // Use default production config if enabled
217            bootstrap_cache_config: None,
218            // identity_config: None, // Use default identity config if enabled
219        }
220    }
221}
222
223impl NodeConfig {
224    /// Create NodeConfig from Config
225    pub fn from_config(config: &Config) -> Result<Self> {
226        let listen_addr = config.listen_socket_addr()?;
227        let bootstrap_addrs = config.bootstrap_addrs()?;
228
229        let mut node_config = Self {
230            peer_id: None,
231            listen_addrs: vec![listen_addr],
232            listen_addr,
233            bootstrap_peers: bootstrap_addrs
234                .iter()
235                .map(|addr| addr.socket_addr())
236                .collect(),
237            bootstrap_peers_str: config
238                .network
239                .bootstrap_nodes
240                .iter()
241                .map(|addr| addr.to_string())
242                .collect(),
243            enable_ipv6: config.network.ipv6_enabled,
244
245            connection_timeout: Duration::from_secs(config.network.connection_timeout),
246            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
247            max_connections: config.network.max_connections,
248            max_incoming_connections: config.security.connection_limit as usize,
249            dht_config: DHTConfig {
250                k_value: 20,
251                alpha_value: 3,
252                record_ttl: Duration::from_secs(3600),
253                refresh_interval: Duration::from_secs(900),
254            },
255            security_config: SecurityConfig {
256                enable_noise: true,
257                enable_tls: true,
258                trust_level: TrustLevel::Basic,
259            },
260            production_config: Some(ProductionConfig {
261                max_connections: config.network.max_connections,
262                max_memory_bytes: 0,  // unlimited
263                max_bandwidth_bps: 0, // unlimited
264                connection_timeout: Duration::from_secs(config.network.connection_timeout),
265                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266                health_check_interval: Duration::from_secs(30),
267                metrics_interval: Duration::from_secs(60),
268                enable_performance_tracking: true,
269                enable_auto_cleanup: true,
270                shutdown_timeout: Duration::from_secs(30),
271                rate_limits: crate::production::RateLimitConfig::default(),
272            }),
273            bootstrap_cache_config: None,
274            // identity_config: Some(IdentityManagerConfig {
275            //     cache_ttl: Duration::from_secs(3600),
276            //     challenge_timeout: Duration::from_secs(30),
277            // }),
278        };
279
280        // Add IPv6 listen address if enabled
281        if config.network.ipv6_enabled {
282            node_config.listen_addrs.push(std::net::SocketAddr::new(
283                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
284                listen_addr.port(),
285            ));
286        }
287
288        Ok(node_config)
289    }
290
291    /// Try to build a NodeConfig from a listen address string
292    pub fn with_listen_addr(addr: &str) -> Result<Self> {
293        let listen_addr: std::net::SocketAddr = addr
294            .parse()
295            .map_err(|e: std::net::AddrParseError| {
296                NetworkError::InvalidAddress(e.to_string().into())
297            })
298            .map_err(P2PError::Network)?;
299        let cfg = NodeConfig {
300            listen_addr,
301            listen_addrs: vec![listen_addr],
302            ..Default::default()
303        };
304        Ok(cfg)
305    }
306}
307
308impl Default for DHTConfig {
309    fn default() -> Self {
310        Self {
311            k_value: 20,
312            alpha_value: 5,
313            record_ttl: Duration::from_secs(3600), // 1 hour
314            refresh_interval: Duration::from_secs(600), // 10 minutes
315        }
316    }
317}
318
319impl Default for SecurityConfig {
320    fn default() -> Self {
321        Self {
322            enable_noise: true,
323            enable_tls: true,
324            trust_level: TrustLevel::Basic,
325        }
326    }
327}
328
329/// Information about a connected peer
330#[derive(Debug, Clone)]
331pub struct PeerInfo {
332    /// Peer identifier
333    pub peer_id: PeerId,
334
335    /// Peer's addresses
336    pub addresses: Vec<String>,
337
338    /// Connection timestamp
339    pub connected_at: Instant,
340
341    /// Last seen timestamp
342    pub last_seen: Instant,
343
344    /// Connection status
345    pub status: ConnectionStatus,
346
347    /// Supported protocols
348    pub protocols: Vec<String>,
349
350    /// Number of heartbeats received
351    pub heartbeat_count: u64,
352}
353
354/// Connection status for a peer
355#[derive(Debug, Clone, PartialEq)]
356pub enum ConnectionStatus {
357    /// Connection is being established
358    Connecting,
359    /// Connection is established and active
360    Connected,
361    /// Connection is being closed
362    Disconnecting,
363    /// Connection is closed
364    Disconnected,
365    /// Connection failed
366    Failed(String),
367}
368
369/// Network events that can occur
370#[derive(Debug, Clone)]
371pub enum NetworkEvent {
372    /// A new peer has connected
373    PeerConnected {
374        /// The identifier of the newly connected peer
375        peer_id: PeerId,
376        /// The network addresses where the peer can be reached
377        addresses: Vec<String>,
378    },
379
380    /// A peer has disconnected
381    PeerDisconnected {
382        /// The identifier of the disconnected peer
383        peer_id: PeerId,
384        /// The reason for the disconnection
385        reason: String,
386    },
387
388    /// A message was received from a peer
389    MessageReceived {
390        /// The identifier of the sending peer
391        peer_id: PeerId,
392        /// The protocol used for the message
393        protocol: String,
394        /// The raw message data
395        data: Vec<u8>,
396    },
397
398    /// A connection attempt failed
399    ConnectionFailed {
400        /// The identifier of the peer (if known)
401        peer_id: Option<PeerId>,
402        /// The address where connection was attempted
403        address: String,
404        /// The error message describing the failure
405        error: String,
406    },
407
408    /// DHT record was stored
409    DHTRecordStored {
410        /// The DHT key where the record was stored
411        key: Vec<u8>,
412        /// The value that was stored
413        value: Vec<u8>,
414    },
415
416    /// DHT record was retrieved
417    DHTRecordRetrieved {
418        /// The DHT key that was queried
419        key: Vec<u8>,
420        /// The retrieved value, if found
421        value: Option<Vec<u8>>,
422    },
423}
424
425/// Network events that can occur in the P2P system
426///
427/// Events are broadcast to all listeners and provide real-time
428/// notifications of network state changes and message arrivals.
429#[derive(Debug, Clone)]
430pub enum P2PEvent {
431    /// Message received from a peer on a specific topic
432    Message {
433        /// Topic or channel the message was sent on
434        topic: String,
435        /// Peer ID of the message sender
436        source: PeerId,
437        /// Raw message data payload
438        data: Vec<u8>,
439    },
440    /// A new peer has connected to the network
441    PeerConnected(PeerId),
442    /// A peer has disconnected from the network
443    PeerDisconnected(PeerId),
444}
445
446/// Main P2P node structure
447/// Main P2P network node that manages connections, routing, and communication
448///
449/// This struct represents a complete P2P network participant that can:
450/// - Connect to other peers via QUIC transport
451/// - Participate in distributed hash table (DHT) operations
452/// - Send and receive messages through various protocols
453/// - Handle network events and peer lifecycle
454/// - Provide MCP (Model Context Protocol) services
455pub struct P2PNode {
456    /// Node configuration
457    config: NodeConfig,
458
459    /// Our peer ID
460    peer_id: PeerId,
461
462    /// Connected peers
463    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
464
465    /// Network event broadcaster
466    event_tx: broadcast::Sender<P2PEvent>,
467
468    /// Listen addresses
469    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
470
471    /// Node start time
472    start_time: Instant,
473
474    /// Running state
475    running: RwLock<bool>,
476
477    /// DHT instance (optional)
478    dht: Option<Arc<RwLock<DHT>>>,
479
480    /// Production resource manager (optional)
481    resource_manager: Option<Arc<ResourceManager>>,
482
483    /// Bootstrap cache manager for peer discovery
484    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
485
486    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
487    dual_node: Arc<DualStackNetworkNode>,
488
489    /// Rate limiter for connection and request throttling
490    #[allow(dead_code)]
491    rate_limiter: Arc<RateLimiter>,
492}
493
494impl P2PNode {
495    /// Minimal constructor for tests that avoids real networking
496    pub fn new_for_tests() -> Result<Self> {
497        let (event_tx, _) = broadcast::channel(16);
498        Ok(Self {
499            config: NodeConfig::default(),
500            peer_id: "test_peer".to_string(),
501            peers: Arc::new(RwLock::new(HashMap::new())),
502            event_tx,
503            listen_addrs: RwLock::new(Vec::new()),
504            start_time: Instant::now(),
505            running: RwLock::new(false),
506            dht: None,
507            resource_manager: None,
508            bootstrap_manager: None,
509            dual_node: {
510                // Bind dual-stack nodes on ephemeral ports for tests
511                let v6: Option<std::net::SocketAddr> = "[::1]:0"
512                    .parse()
513                    .ok()
514                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
515                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
516                let handle = tokio::runtime::Handle::current();
517                let dual_attempt = handle.block_on(
518                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
519                );
520                let dual = match dual_attempt {
521                    Ok(d) => d,
522                    Err(_e1) => {
523                        // Fallback to IPv4-only ephemeral bind
524                        let fallback = handle.block_on(
525                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
526                                None,
527                                "127.0.0.1:0".parse().ok(),
528                            ),
529                        );
530                        match fallback {
531                            Ok(d) => d,
532                            Err(e2) => {
533                                return Err(P2PError::Network(NetworkError::BindError(
534                                    format!("Failed to create dual-stack network node: {}", e2)
535                                        .into(),
536                                )));
537                            }
538                        }
539                    }
540                };
541                Arc::new(dual)
542            },
543            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
544                max_requests: 100,
545                burst_size: 100,
546                window: std::time::Duration::from_secs(1),
547                ..Default::default()
548            })),
549        })
550    }
551    /// Create a new P2P node with the given configuration
552    pub async fn new(config: NodeConfig) -> Result<Self> {
553        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
554            // Generate a random peer ID for now
555            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
556        });
557
558        let (event_tx, _) = broadcast::channel(1000);
559
560        // Initialize and register a TrustWeightedKademlia DHT for the global API
561        // Use a deterministic local NodeId derived from the peer_id
562        {
563            use blake3::Hasher;
564            let mut hasher = Hasher::new();
565            hasher.update(peer_id.as_bytes());
566            let digest = hasher.finalize();
567            let mut nid = [0u8; 32];
568            nid.copy_from_slice(digest.as_bytes());
569            let twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
570                crate::identity::node_identity::NodeId::from_bytes(nid),
571            ));
572            let _ = crate::api::set_dht_instance(twdht);
573        }
574
575        // Initialize DHT if needed
576        let dht = if true {
577            // Always enable DHT for now
578            let _dht_config = crate::dht::DHTConfig {
579                replication_factor: config.dht_config.k_value,
580                bucket_size: config.dht_config.k_value,
581                alpha: config.dht_config.alpha_value,
582                record_ttl: config.dht_config.record_ttl,
583                bucket_refresh_interval: config.dht_config.refresh_interval,
584                republish_interval: config.dht_config.refresh_interval,
585                max_distance: 160, // 160 bits for SHA-256
586            };
587            // Convert peer_id String to NodeId
588            let peer_bytes = peer_id.as_bytes();
589            let mut node_id_bytes = [0u8; 32];
590            let len = peer_bytes.len().min(32);
591            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
592            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
593            let dht_instance = DHT::new(node_id).map_err(|e| {
594                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
595                    e.to_string().into(),
596                ))
597            })?;
598            Some(Arc::new(RwLock::new(dht_instance)))
599        } else {
600            None
601        };
602
603        // MCP removed
604
605        // Initialize production resource manager if configured
606        let resource_manager = config
607            .production_config
608            .clone()
609            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
610
611        // Initialize bootstrap cache manager
612        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
613            match BootstrapManager::with_config(cache_config.clone()).await {
614                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
615                Err(e) => {
616                    warn!(
617                        "Failed to initialize bootstrap manager: {}, continuing without cache",
618                        e
619                    );
620                    None
621                }
622            }
623        } else {
624            match BootstrapManager::new().await {
625                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
626                Err(e) => {
627                    warn!(
628                        "Failed to initialize bootstrap manager: {}, continuing without cache",
629                        e
630                    );
631                    None
632                }
633            }
634        };
635
636        // Initialize dual-stack ant-quic nodes
637        let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
638            let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
639            let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
640            (v6_addr, v4_addr)
641        } else {
642            // Defaults: always listen on IPv4; IPv6 if enabled
643            let v4_addr = Some(std::net::SocketAddr::new(
644                std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
645                config.listen_addr.port(),
646            ));
647            let v6_addr = if config.enable_ipv6 {
648                Some(std::net::SocketAddr::new(
649                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
650                    config.listen_addr.port(),
651                ))
652            } else {
653                None
654            };
655            (v6_addr, v4_addr)
656        };
657
658        let dual_node = Arc::new(
659            DualStackNetworkNode::new(v6_opt, v4_opt)
660                .await
661                .map_err(|e| {
662                    P2PError::Transport(crate::error::TransportError::SetupFailed(
663                        format!("Failed to create dual-stack network nodes: {}", e).into(),
664                    ))
665                })?,
666        );
667
668        // Initialize rate limiter with default config
669        let rate_limiter = Arc::new(RateLimiter::new(
670            crate::validation::RateLimitConfig::default(),
671        ));
672
673        let node = Self {
674            config,
675            peer_id,
676            peers: Arc::new(RwLock::new(HashMap::new())),
677            event_tx,
678            listen_addrs: RwLock::new(Vec::new()),
679            start_time: Instant::now(),
680            running: RwLock::new(false),
681            dht,
682            resource_manager,
683            bootstrap_manager,
684            dual_node,
685            rate_limiter,
686        };
687        info!("Created P2P node with peer ID: {}", node.peer_id);
688
689        Ok(node)
690    }
691
692    /// Create a new node builder
693    pub fn builder() -> NodeBuilder {
694        NodeBuilder::new()
695    }
696
697    /// Get the peer ID of this node
698    pub fn peer_id(&self) -> &PeerId {
699        &self.peer_id
700    }
701
702    pub fn local_addr(&self) -> Option<String> {
703        self.listen_addrs
704            .try_read()
705            .ok()
706            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
707    }
708
709    pub async fn subscribe(&self, topic: &str) -> Result<()> {
710        // In a real implementation, this would register the topic with the pubsub mechanism.
711        // For now, we just log it.
712        info!("Subscribed to topic: {}", topic);
713        Ok(())
714    }
715
716    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
717        info!(
718            "Publishing message to topic: {} ({} bytes)",
719            topic,
720            data.len()
721        );
722
723        // Get list of connected peers
724        let peer_list: Vec<PeerId> = {
725            let peers_guard = self.peers.read().await;
726            peers_guard.keys().cloned().collect()
727        };
728
729        if peer_list.is_empty() {
730            debug!("No peers connected, message will only be sent to local subscribers");
731        } else {
732            // Send message to all connected peers
733            let mut send_count = 0;
734            for peer_id in &peer_list {
735                match self.send_message(peer_id, topic, data.to_vec()).await {
736                    Ok(_) => {
737                        send_count += 1;
738                        debug!("Sent message to peer: {}", peer_id);
739                    }
740                    Err(e) => {
741                        warn!("Failed to send message to peer {}: {}", peer_id, e);
742                    }
743                }
744            }
745            info!(
746                "Published message to {}/{} connected peers",
747                send_count,
748                peer_list.len()
749            );
750        }
751
752        // Also send to local subscribers (for local echo and testing)
753        let event = P2PEvent::Message {
754            topic: topic.to_string(),
755            source: self.peer_id.clone(),
756            data: data.to_vec(),
757        };
758        let _ = self.event_tx.send(event);
759
760        Ok(())
761    }
762
763    /// Get the node configuration
764    pub fn config(&self) -> &NodeConfig {
765        &self.config
766    }
767
768    /// Start the P2P node
769    pub async fn start(&self) -> Result<()> {
770        info!("Starting P2P node...");
771
772        // Start production resource manager if configured
773        if let Some(ref resource_manager) = self.resource_manager {
774            resource_manager.start().await.map_err(|e| {
775                P2PError::Network(crate::error::NetworkError::ProtocolError(
776                    format!("Failed to start resource manager: {e}").into(),
777                ))
778            })?;
779            info!("Production resource manager started");
780        }
781
782        // Start bootstrap manager background tasks
783        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
784            let mut manager = bootstrap_manager.write().await;
785            manager.start_background_tasks().await.map_err(|e| {
786                P2PError::Network(crate::error::NetworkError::ProtocolError(
787                    format!("Failed to start bootstrap manager: {e}").into(),
788                ))
789            })?;
790            info!("Bootstrap cache manager started");
791        }
792
793        // Set running state
794        *self.running.write().await = true;
795
796        // Start listening on configured addresses using transport layer
797        self.start_network_listeners().await?;
798
799        // Log current listen addresses
800        let listen_addrs = self.listen_addrs.read().await;
801        info!("P2P node started on addresses: {:?}", *listen_addrs);
802
803        // MCP removed
804
805        // Start message receiving system
806        self.start_message_receiving_system().await?;
807
808        // Connect to bootstrap peers
809        self.connect_bootstrap_peers().await?;
810
811        Ok(())
812    }
813
814    /// Start network listeners on configured addresses
815    async fn start_network_listeners(&self) -> Result<()> {
816        info!("Starting dual-stack listeners (ant-quic)...");
817        // Update our listen_addrs from the dual node bindings
818        let addrs = self.dual_node.local_addrs();
819        {
820            let mut la = self.listen_addrs.write().await;
821            *la = addrs.clone();
822        }
823
824        // Spawn a background accept loop that handles incoming connections from either stack
825        let event_tx = self.event_tx.clone();
826        let peers = self.peers.clone();
827        let rate_limiter = self.rate_limiter.clone();
828        let dual = self.dual_node.clone();
829        tokio::spawn(async move {
830            loop {
831                match dual.accept_any().await {
832                    Ok((ant_peer_id, remote_sock)) => {
833                        let peer_id =
834                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
835                        let remote_addr = NetworkAddress::from(remote_sock);
836                        // Optional: basic IP rate limiting
837                        let _ = rate_limiter.check_ip(&remote_sock.ip());
838                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
839                        register_new_peer(&peers, &peer_id, &remote_addr).await;
840                    }
841                    Err(e) => {
842                        warn!("Accept failed: {}", e);
843                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
844                    }
845                }
846            }
847        });
848
849        info!("Dual-stack listeners active on: {:?}", addrs);
850        Ok(())
851    }
852
853    /// Start a listener on a specific socket address
854    #[allow(dead_code)]
855    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
856        // use crate::transport::{Transport}; // Unused during migration
857
858        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
859        /*
860        // Try QUIC first (preferred transport)
861        match crate::transport::QuicTransport::new(Default::default()) {
862            Ok(quic_transport) => {
863                match quic_transport.listen(NetworkAddress::new(addr)).await {
864                    Ok(listen_addrs) => {
865                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
866
867                        // Store the actual listening addresses in the node
868                        {
869                            let mut node_listen_addrs = self.listen_addrs.write().await;
870                            // Don't clear - accumulate addresses from multiple listeners
871                            node_listen_addrs.push(listen_addrs.socket_addr());
872                        }
873
874                        // Start accepting connections in background
875                        self.start_connection_acceptor(
876                            Arc::new(quic_transport),
877                            addr,
878                            crate::transport::TransportType::QUIC
879                        ).await?;
880
881                        return Ok(());
882                    }
883                    Err(e) => {
884                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
885                    }
886                }
887            }
888            Err(e) => {
889                warn!("Failed to create QUIC transport for listening: {}", e);
890            }
891        }
892        */
893
894        warn!("QUIC transport temporarily disabled during ant-quic migration");
895        // No TCP fallback - QUIC only
896        Err(crate::P2PError::Transport(
897            crate::error::TransportError::SetupFailed(
898                format!(
899                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
900                )
901                .into(),
902            ),
903        ))
904    }
905
906    /// Start connection acceptor background task
907    #[allow(dead_code)] // Deprecated during ant-quic migration
908    async fn start_connection_acceptor(
909        &self,
910        transport: Arc<dyn crate::transport::Transport>,
911        addr: std::net::SocketAddr,
912        transport_type: crate::transport::TransportType,
913    ) -> Result<()> {
914        info!(
915            "Starting connection acceptor for {:?} on {}",
916            transport_type, addr
917        );
918
919        // Clone necessary data for the background task
920        let event_tx = self.event_tx.clone();
921        let _peer_id = self.peer_id.clone();
922        let peers = Arc::clone(&self.peers);
923        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
924
925        let rate_limiter = Arc::clone(&self.rate_limiter);
926
927        // Spawn background task to accept incoming connections
928        tokio::spawn(async move {
929            loop {
930                match transport.accept().await {
931                    Ok(connection) => {
932                        let remote_addr = connection.remote_addr();
933                        let connection_peer_id =
934                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
935
936                        // Apply rate limiting for incoming connections
937                        let socket_addr = remote_addr.socket_addr();
938                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
939                            // Connection dropped automatically when it goes out of scope
940                            continue;
941                        }
942
943                        info!(
944                            "Accepted {:?} connection from {} (peer: {})",
945                            transport_type, remote_addr, connection_peer_id
946                        );
947
948                        // Generate peer connected event
949                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
950
951                        // Store the peer connection
952                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
953
954                        // Spawn task to handle this specific connection's messages
955                        spawn_connection_handler(
956                            connection,
957                            connection_peer_id,
958                            event_tx.clone(),
959                            Arc::clone(&peers),
960                        );
961                    }
962                    Err(e) => {
963                        warn!(
964                            "Failed to accept {:?} connection on {}: {}",
965                            transport_type, addr, e
966                        );
967
968                        // Brief pause before retrying to avoid busy loop
969                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
970                    }
971                }
972            }
973        });
974
975        info!(
976            "Connection acceptor background task started for {:?} on {}",
977            transport_type, addr
978        );
979        Ok(())
980    }
981
982    /// Start the message receiving system with background tasks
983    async fn start_message_receiving_system(&self) -> Result<()> {
984        info!("Starting message receiving system");
985        let dual = self.dual_node.clone();
986        let event_tx = self.event_tx.clone();
987
988        tokio::spawn(async move {
989            loop {
990                match dual.receive_any().await {
991                    Ok((_peer_id, bytes)) => {
992                        // Expect the JSON message wrapper from create_protocol_message
993                        #[allow(clippy::collapsible_if)]
994                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
995                            if let (Some(protocol), Some(data), Some(from)) = (
996                                value.get("protocol").and_then(|v| v.as_str()),
997                                value.get("data").and_then(|v| v.as_array()),
998                                value.get("from").and_then(|v| v.as_str()),
999                            ) {
1000                                let payload: Vec<u8> = data
1001                                    .iter()
1002                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1003                                    .collect();
1004                                let _ = event_tx.send(P2PEvent::Message {
1005                                    topic: protocol.to_string(),
1006                                    source: from.to_string(),
1007                                    data: payload,
1008                                });
1009                            }
1010                        }
1011                    }
1012                    Err(e) => {
1013                        warn!("Receive error: {}", e);
1014                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1015                    }
1016                }
1017            }
1018        });
1019
1020        Ok(())
1021    }
1022
1023    /// Handle a received message and generate appropriate events
1024    #[allow(dead_code)]
1025    async fn handle_received_message(
1026        &self,
1027        message_data: Vec<u8>,
1028        peer_id: &PeerId,
1029        _protocol: &str,
1030        event_tx: &broadcast::Sender<P2PEvent>,
1031    ) -> Result<()> {
1032        // MCP removed: no special protocol handling
1033
1034        // Parse the message format we created in create_protocol_message
1035        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1036            Ok(message) => {
1037                if let (Some(protocol), Some(data), Some(from)) = (
1038                    message.get("protocol").and_then(|v| v.as_str()),
1039                    message.get("data").and_then(|v| v.as_array()),
1040                    message.get("from").and_then(|v| v.as_str()),
1041                ) {
1042                    // Convert data array back to bytes
1043                    let data_bytes: Vec<u8> = data
1044                        .iter()
1045                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1046                        .collect();
1047
1048                    // Generate message event
1049                    let event = P2PEvent::Message {
1050                        topic: protocol.to_string(),
1051                        source: from.to_string(),
1052                        data: data_bytes,
1053                    };
1054
1055                    let _ = event_tx.send(event);
1056                    debug!("Generated message event from peer: {}", peer_id);
1057                }
1058            }
1059            Err(e) => {
1060                warn!("Failed to parse received message from {}: {}", peer_id, e);
1061            }
1062        }
1063
1064        Ok(())
1065    }
1066
1067    // MCP removed
1068
1069    // MCP removed
1070
1071    /// Run the P2P node (blocks until shutdown)
1072    pub async fn run(&self) -> Result<()> {
1073        if !*self.running.read().await {
1074            self.start().await?;
1075        }
1076
1077        info!("P2P node running...");
1078
1079        // Main event loop
1080        loop {
1081            if !*self.running.read().await {
1082                break;
1083            }
1084
1085            // Perform periodic tasks
1086            self.periodic_tasks().await?;
1087
1088            // Sleep for a short interval
1089            tokio::time::sleep(Duration::from_millis(100)).await;
1090        }
1091
1092        info!("P2P node stopped");
1093        Ok(())
1094    }
1095
1096    /// Stop the P2P node
1097    pub async fn stop(&self) -> Result<()> {
1098        info!("Stopping P2P node...");
1099
1100        // Set running state to false
1101        *self.running.write().await = false;
1102
1103        // Disconnect all peers
1104        self.disconnect_all_peers().await?;
1105
1106        // Shutdown production resource manager if configured
1107        if let Some(ref resource_manager) = self.resource_manager {
1108            resource_manager.shutdown().await.map_err(|e| {
1109                P2PError::Network(crate::error::NetworkError::ProtocolError(
1110                    format!("Failed to shutdown resource manager: {e}").into(),
1111                ))
1112            })?;
1113            info!("Production resource manager stopped");
1114        }
1115
1116        info!("P2P node stopped");
1117        Ok(())
1118    }
1119
1120    /// Graceful shutdown alias for tests
1121    pub async fn shutdown(&self) -> Result<()> {
1122        self.stop().await
1123    }
1124
1125    /// Check if the node is running
1126    pub async fn is_running(&self) -> bool {
1127        *self.running.read().await
1128    }
1129
1130    /// Get the current listen addresses
1131    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1132        self.listen_addrs.read().await.clone()
1133    }
1134
1135    /// Get connected peers
1136    pub async fn connected_peers(&self) -> Vec<PeerId> {
1137        self.peers.read().await.keys().cloned().collect()
1138    }
1139
1140    /// Get peer count
1141    pub async fn peer_count(&self) -> usize {
1142        self.peers.read().await.len()
1143    }
1144
1145    /// Get peer info
1146    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1147        self.peers.read().await.get(peer_id).cloned()
1148    }
1149
1150    /// Connect to a peer
1151    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1152        info!("Connecting to peer at: {}", address);
1153
1154        // Check production limits if resource manager is enabled
1155        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1156            Some(resource_manager.acquire_connection().await?)
1157        } else {
1158            None
1159        };
1160
1161        // Parse the address to SocketAddr format
1162        let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1163            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1164                format!("{}: {}", address, e).into(),
1165            ))
1166        })?;
1167
1168        // Establish a real connection via dual-stack Happy Eyeballs
1169        let peer_id = {
1170            match self
1171                .dual_node
1172                .connect_happy_eyeballs(&[_socket_addr])
1173                .await
1174                .map(|p| crate::transport::ant_quic_adapter::ant_peer_id_to_string(&p))
1175            {
1176                Ok(connected_peer_id) => {
1177                    info!("Successfully connected to peer: {}", connected_peer_id);
1178                    connected_peer_id
1179                }
1180                Err(e) => {
1181                    warn!("Failed to connect to peer at {}: {}", address, e);
1182
1183                    // For demo purposes, try a simplified connection approach
1184                    // Create a mock peer ID based on address for now
1185                    let demo_peer_id =
1186                        format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
1187                    warn!(
1188                        "Using demo peer ID: {} (transport connection failed)",
1189                        demo_peer_id
1190                    );
1191                    demo_peer_id
1192                }
1193            }
1194        };
1195
1196        // Create peer info with connection details
1197        let peer_info = PeerInfo {
1198            peer_id: peer_id.clone(),
1199            addresses: vec![address.to_string()],
1200            connected_at: Instant::now(),
1201            last_seen: Instant::now(),
1202            status: ConnectionStatus::Connected,
1203            protocols: vec!["p2p-foundation/1.0".to_string()],
1204            heartbeat_count: 0,
1205        };
1206
1207        // Store peer information
1208        self.peers.write().await.insert(peer_id.clone(), peer_info);
1209
1210        // Record bandwidth usage if resource manager is enabled
1211        if let Some(ref resource_manager) = self.resource_manager {
1212            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1213        }
1214
1215        // Emit connection event
1216        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1217
1218        info!("Connected to peer: {}", peer_id);
1219        Ok(peer_id)
1220    }
1221
1222    /// Disconnect from a peer
1223    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1224        info!("Disconnecting from peer: {}", peer_id);
1225
1226        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1227            peer_info.status = ConnectionStatus::Disconnected;
1228
1229            // Emit event
1230            let _ = self
1231                .event_tx
1232                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1233
1234            info!("Disconnected from peer: {}", peer_id);
1235        }
1236
1237        Ok(())
1238    }
1239
1240    /// Send a message to a peer
1241    pub async fn send_message(
1242        &self,
1243        peer_id: &PeerId,
1244        protocol: &str,
1245        data: Vec<u8>,
1246    ) -> Result<()> {
1247        debug!(
1248            "Sending message to peer {} on protocol {}",
1249            peer_id, protocol
1250        );
1251
1252        // Check rate limits if resource manager is enabled
1253        if let Some(ref resource_manager) = self.resource_manager
1254            && !resource_manager
1255                .check_rate_limit(peer_id, "message")
1256                .await?
1257        {
1258            return Err(P2PError::ResourceExhausted(
1259                format!("Rate limit exceeded for peer {}", peer_id).into(),
1260            ));
1261        }
1262
1263        // Check if peer is connected
1264        if !self.peers.read().await.contains_key(peer_id) {
1265            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1266                peer_id.to_string().into(),
1267            )));
1268        }
1269
1270        // MCP removed: no special-case protocol validation
1271
1272        // Record bandwidth usage if resource manager is enabled
1273        if let Some(ref resource_manager) = self.resource_manager {
1274            resource_manager.record_bandwidth(data.len() as u64, 0);
1275        }
1276
1277        // Create protocol message wrapper
1278        let _message_data = self.create_protocol_message(protocol, data)?;
1279
1280        // Send via ant-quic dual-node
1281        self.dual_node
1282            .send_to_peer_string(peer_id, &_message_data)
1283            .await
1284            .map_err(|e| {
1285                P2PError::Transport(crate::error::TransportError::StreamError(
1286                    e.to_string().into(),
1287                ))
1288            })
1289    }
1290
1291    /// Create a protocol message wrapper
1292    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1293        use serde_json::json;
1294
1295        let timestamp = std::time::SystemTime::now()
1296            .duration_since(std::time::UNIX_EPOCH)
1297            .map_err(|e| {
1298                P2PError::Network(NetworkError::ProtocolError(
1299                    format!("System time error: {}", e).into(),
1300                ))
1301            })?
1302            .as_secs();
1303
1304        // Create a simple message format for P2P communication
1305        let message = json!({
1306            "protocol": protocol,
1307            "data": data,
1308            "from": self.peer_id,
1309            "timestamp": timestamp
1310        });
1311
1312        serde_json::to_vec(&message).map_err(|e| {
1313            P2PError::Transport(crate::error::TransportError::StreamError(
1314                format!("Failed to serialize message: {e}").into(),
1315            ))
1316        })
1317    }
1318
1319    // Note: async listen_addrs() already exists above for fetching listen addresses
1320}
1321
1322/// Create a protocol message wrapper (static version for background tasks)
1323#[allow(dead_code)]
1324fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1325    use serde_json::json;
1326
1327    let timestamp = std::time::SystemTime::now()
1328        .duration_since(std::time::UNIX_EPOCH)
1329        .map_err(|e| {
1330            P2PError::Network(NetworkError::ProtocolError(
1331                format!("System time error: {}", e).into(),
1332            ))
1333        })?
1334        .as_secs();
1335
1336    // Create a simple message format for P2P communication
1337    let message = json!({
1338        "protocol": protocol,
1339        "data": data,
1340        "timestamp": timestamp
1341    });
1342
1343    serde_json::to_vec(&message).map_err(|e| {
1344        P2PError::Transport(crate::error::TransportError::StreamError(
1345            format!("Failed to serialize message: {e}").into(),
1346        ))
1347    })
1348}
1349
1350impl P2PNode {
1351    /// Subscribe to network events
1352    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1353        self.event_tx.subscribe()
1354    }
1355
1356    /// Backwards-compat event stream accessor for tests
1357    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1358        self.subscribe_events()
1359    }
1360
1361    /// Get node uptime
1362    pub fn uptime(&self) -> Duration {
1363        self.start_time.elapsed()
1364    }
1365
1366    // MCP removed: all MCP tool/service methods removed
1367
1368    // /// Handle MCP remote tool call with network integration
1369
1370    // /// List tools available on a specific remote peer
1371
1372    // /// Get MCP server statistics
1373
1374    /// Get production resource metrics
1375    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1376        if let Some(ref resource_manager) = self.resource_manager {
1377            Ok(resource_manager.get_metrics().await)
1378        } else {
1379            Err(P2PError::Network(
1380                crate::error::NetworkError::ProtocolError(
1381                    "Production resource manager not enabled".to_string().into(),
1382                ),
1383            ))
1384        }
1385    }
1386
1387    /// Check system health
1388    pub async fn health_check(&self) -> Result<()> {
1389        if let Some(ref resource_manager) = self.resource_manager {
1390            resource_manager.health_check().await
1391        } else {
1392            // Basic health check without resource manager
1393            let peer_count = self.peer_count().await;
1394            if peer_count > self.config.max_connections {
1395                Err(P2PError::Network(
1396                    crate::error::NetworkError::ProtocolError(
1397                        format!("Too many connections: {peer_count}").into(),
1398                    ),
1399                ))
1400            } else {
1401                Ok(())
1402            }
1403        }
1404    }
1405
1406    /// Get production configuration (if enabled)
1407    pub fn production_config(&self) -> Option<&ProductionConfig> {
1408        self.config.production_config.as_ref()
1409    }
1410
1411    /// Check if production hardening is enabled
1412    pub fn is_production_mode(&self) -> bool {
1413        self.resource_manager.is_some()
1414    }
1415
1416    /// Get DHT reference
1417    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1418        self.dht.as_ref()
1419    }
1420
1421    /// Store a value in the DHT
1422    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1423        if let Some(ref dht) = self.dht {
1424            let mut dht_instance = dht.write().await;
1425            let dht_key = crate::dht::DhtKey::from_bytes(key);
1426            dht_instance
1427                .store(&dht_key, value.clone())
1428                .await
1429                .map_err(|e| {
1430                    P2PError::Dht(crate::error::DhtError::StoreFailed(
1431                        format!("{:?}: {e}", key).into(),
1432                    ))
1433                })?;
1434
1435            Ok(())
1436        } else {
1437            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1438                "DHT not enabled".to_string().into(),
1439            )))
1440        }
1441    }
1442
1443    /// Retrieve a value from the DHT
1444    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1445        if let Some(ref dht) = self.dht {
1446            let dht_instance = dht.read().await;
1447            let dht_key = crate::dht::DhtKey::from_bytes(key);
1448            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1449                P2PError::Dht(crate::error::DhtError::StoreFailed(
1450                    format!("Retrieve failed: {e}").into(),
1451                ))
1452            })?;
1453
1454            Ok(record_result)
1455        } else {
1456            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1457                "DHT not enabled".to_string().into(),
1458            )))
1459        }
1460    }
1461
1462    /// Add a discovered peer to the bootstrap cache
1463    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1464        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1465            let mut manager = bootstrap_manager.write().await;
1466            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1467                .iter()
1468                .filter_map(|addr| addr.parse().ok())
1469                .collect();
1470            let contact = ContactEntry::new(peer_id, socket_addresses);
1471            manager.add_contact(contact).await.map_err(|e| {
1472                P2PError::Network(crate::error::NetworkError::ProtocolError(
1473                    format!("Failed to add peer to bootstrap cache: {e}").into(),
1474                ))
1475            })?;
1476        }
1477        Ok(())
1478    }
1479
1480    /// Update connection metrics for a peer in the bootstrap cache
1481    pub async fn update_peer_metrics(
1482        &self,
1483        peer_id: &PeerId,
1484        success: bool,
1485        latency_ms: Option<u64>,
1486        _error: Option<String>,
1487    ) -> Result<()> {
1488        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1489            let mut manager = bootstrap_manager.write().await;
1490
1491            // Create quality metrics based on the connection result
1492            let metrics = QualityMetrics {
1493                success_rate: if success { 1.0 } else { 0.0 },
1494                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1495                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
1496                last_connection_attempt: chrono::Utc::now(),
1497                last_successful_connection: if success {
1498                    chrono::Utc::now()
1499                } else {
1500                    chrono::Utc::now() - chrono::Duration::hours(1)
1501                },
1502                uptime_score: 0.5,
1503            };
1504
1505            manager
1506                .update_contact_metrics(peer_id, metrics)
1507                .await
1508                .map_err(|e| {
1509                    P2PError::Network(crate::error::NetworkError::ProtocolError(
1510                        format!("Failed to update peer metrics: {e}").into(),
1511                    ))
1512                })?;
1513        }
1514        Ok(())
1515    }
1516
1517    /// Get bootstrap cache statistics
1518    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1519        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1520            let manager = bootstrap_manager.read().await;
1521            let stats = manager.get_stats().await.map_err(|e| {
1522                P2PError::Network(crate::error::NetworkError::ProtocolError(
1523                    format!("Failed to get bootstrap stats: {e}").into(),
1524                ))
1525            })?;
1526            Ok(Some(stats))
1527        } else {
1528            Ok(None)
1529        }
1530    }
1531
1532    /// Get the number of cached bootstrap peers
1533    pub async fn cached_peer_count(&self) -> usize {
1534        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1535            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1536        {
1537            return stats.total_contacts;
1538        }
1539        0
1540    }
1541
1542    /// Connect to bootstrap peers
1543    async fn connect_bootstrap_peers(&self) -> Result<()> {
1544        let mut bootstrap_contacts = Vec::new();
1545        let mut used_cache = false;
1546
1547        // Try to get peers from bootstrap cache first
1548        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1549            let manager = bootstrap_manager.read().await;
1550            match manager.get_bootstrap_peers(20).await {
1551                // Try to get top 20 quality peers
1552                Ok(contacts) => {
1553                    if !contacts.is_empty() {
1554                        info!("Using {} cached bootstrap peers", contacts.len());
1555                        bootstrap_contacts = contacts;
1556                        used_cache = true;
1557                    }
1558                }
1559                Err(e) => {
1560                    warn!("Failed to get cached bootstrap peers: {}", e);
1561                }
1562            }
1563        }
1564
1565        // Fallback to configured bootstrap peers if no cache or cache is empty
1566        if bootstrap_contacts.is_empty() {
1567            let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1568                &self.config.bootstrap_peers_str
1569            } else {
1570                // Convert Multiaddr to strings for fallback
1571                &self
1572                    .config
1573                    .bootstrap_peers
1574                    .iter()
1575                    .map(|addr| addr.to_string())
1576                    .collect::<Vec<_>>()
1577            };
1578
1579            if bootstrap_peers.is_empty() {
1580                info!("No bootstrap peers configured and no cached peers available");
1581                return Ok(());
1582            }
1583
1584            info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1585
1586            for addr in bootstrap_peers {
1587                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1588                    let contact = ContactEntry::new(
1589                        format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1590                        vec![socket_addr],
1591                    );
1592                    bootstrap_contacts.push(contact);
1593                } else {
1594                    warn!("Invalid bootstrap address format: {}", addr);
1595                }
1596            }
1597        }
1598
1599        // Connect to bootstrap peers
1600        let mut successful_connections = 0;
1601        for contact in bootstrap_contacts {
1602            for addr in &contact.addresses {
1603                match self.connect_peer(&addr.to_string()).await {
1604                    Ok(peer_id) => {
1605                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1606                        successful_connections += 1;
1607
1608                        // Update bootstrap cache with successful connection
1609                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1610                            let mut manager = bootstrap_manager.write().await;
1611                            let mut updated_contact = contact.clone();
1612                            updated_contact.peer_id = peer_id.clone();
1613                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
1614
1615                            if let Err(e) = manager.add_contact(updated_contact).await {
1616                                warn!("Failed to update bootstrap cache: {}", e);
1617                            }
1618                        }
1619                        break; // Successfully connected, move to next contact
1620                    }
1621                    Err(e) => {
1622                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1623
1624                        // Update bootstrap cache with failed connection
1625                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1626                            let mut manager = bootstrap_manager.write().await;
1627                            let mut updated_contact = contact.clone();
1628                            updated_contact.update_connection_result(
1629                                false,
1630                                None,
1631                                Some(e.to_string()),
1632                            );
1633
1634                            if let Err(e) = manager.add_contact(updated_contact).await {
1635                                warn!("Failed to update bootstrap cache: {}", e);
1636                            }
1637                        }
1638                    }
1639                }
1640            }
1641        }
1642
1643        if successful_connections == 0 {
1644            if !used_cache {
1645                warn!("Failed to connect to any bootstrap peers");
1646            }
1647            return Err(P2PError::Network(NetworkError::ConnectionFailed {
1648                addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), // Placeholder for bootstrap ensemble
1649                reason: "Failed to connect to any bootstrap peers".into(),
1650            }));
1651        }
1652        info!(
1653            "Successfully connected to {} bootstrap peers",
1654            successful_connections
1655        );
1656
1657        Ok(())
1658    }
1659
1660    /// Disconnect from all peers
1661    async fn disconnect_all_peers(&self) -> Result<()> {
1662        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
1663
1664        for peer_id in peer_ids {
1665            self.disconnect_peer(&peer_id).await?;
1666        }
1667
1668        Ok(())
1669    }
1670
1671    /// Perform periodic maintenance tasks
1672    async fn periodic_tasks(&self) -> Result<()> {
1673        // Update peer last seen timestamps
1674        // Remove stale connections
1675        // Perform DHT maintenance
1676        // This is a placeholder for now
1677
1678        Ok(())
1679    }
1680}
1681
1682/// Network sender trait for sending messages
1683#[async_trait::async_trait]
1684pub trait NetworkSender: Send + Sync {
1685    /// Send a message to a specific peer
1686    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1687
1688    /// Get our local peer ID
1689    fn local_peer_id(&self) -> &PeerId;
1690}
1691
1692/// Lightweight wrapper for P2PNode to implement NetworkSender
1693#[derive(Clone)]
1694pub struct P2PNetworkSender {
1695    peer_id: PeerId,
1696    // Use channels for async communication with the P2P node
1697    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1698}
1699
1700impl P2PNetworkSender {
1701    pub fn new(
1702        peer_id: PeerId,
1703        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1704    ) -> Self {
1705        Self { peer_id, send_tx }
1706    }
1707}
1708
1709/// Implementation of NetworkSender trait for P2PNetworkSender
1710#[async_trait::async_trait]
1711impl NetworkSender for P2PNetworkSender {
1712    /// Send a message to a specific peer via the P2P network
1713    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
1714        self.send_tx
1715            .send((peer_id.clone(), protocol.to_string(), data))
1716            .map_err(|_| {
1717                P2PError::Network(crate::error::NetworkError::ProtocolError(
1718                    "Failed to send message via channel".to_string().into(),
1719                ))
1720            })?;
1721        Ok(())
1722    }
1723
1724    /// Get our local peer ID
1725    fn local_peer_id(&self) -> &PeerId {
1726        &self.peer_id
1727    }
1728}
1729
1730/// Builder pattern for creating P2P nodes
1731pub struct NodeBuilder {
1732    config: NodeConfig,
1733}
1734
1735impl Default for NodeBuilder {
1736    fn default() -> Self {
1737        Self::new()
1738    }
1739}
1740
1741impl NodeBuilder {
1742    /// Create a new node builder
1743    pub fn new() -> Self {
1744        Self {
1745            config: NodeConfig::default(),
1746        }
1747    }
1748
1749    /// Set the peer ID
1750    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1751        self.config.peer_id = Some(peer_id);
1752        self
1753    }
1754
1755    /// Add a listen address
1756    pub fn listen_on(mut self, addr: &str) -> Self {
1757        if let Ok(multiaddr) = addr.parse() {
1758            self.config.listen_addrs.push(multiaddr);
1759        }
1760        self
1761    }
1762
1763    /// Add a bootstrap peer
1764    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1765        if let Ok(multiaddr) = addr.parse() {
1766            self.config.bootstrap_peers.push(multiaddr);
1767        }
1768        self.config.bootstrap_peers_str.push(addr.to_string());
1769        self
1770    }
1771
1772    /// Enable IPv6 support
1773    pub fn with_ipv6(mut self, enable: bool) -> Self {
1774        self.config.enable_ipv6 = enable;
1775        self
1776    }
1777
1778    // MCP removed: builder methods deleted
1779
1780    /// Set connection timeout
1781    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1782        self.config.connection_timeout = timeout;
1783        self
1784    }
1785
1786    /// Set maximum connections
1787    pub fn with_max_connections(mut self, max: usize) -> Self {
1788        self.config.max_connections = max;
1789        self
1790    }
1791
1792    /// Enable production mode with default configuration
1793    pub fn with_production_mode(mut self) -> Self {
1794        self.config.production_config = Some(ProductionConfig::default());
1795        self
1796    }
1797
1798    /// Configure production settings
1799    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1800        self.config.production_config = Some(production_config);
1801        self
1802    }
1803
1804    /// Configure DHT settings
1805    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
1806        self.config.dht_config = dht_config;
1807        self
1808    }
1809
1810    /// Enable DHT with default configuration
1811    pub fn with_default_dht(mut self) -> Self {
1812        self.config.dht_config = DHTConfig::default();
1813        self
1814    }
1815
1816    /// Build the P2P node
1817    pub async fn build(self) -> Result<P2PNode> {
1818        P2PNode::new(self.config).await
1819    }
1820}
1821
1822/// Standalone function to handle received messages without borrowing self
1823#[allow(dead_code)] // Deprecated during ant-quic migration
1824async fn handle_received_message_standalone(
1825    message_data: Vec<u8>,
1826    peer_id: &PeerId,
1827    _protocol: &str,
1828    event_tx: &broadcast::Sender<P2PEvent>,
1829) -> Result<()> {
1830    // Parse the message format
1831    match serde_json::from_slice::<serde_json::Value>(&message_data) {
1832        Ok(message) => {
1833            if let (Some(protocol), Some(data), Some(from)) = (
1834                message.get("protocol").and_then(|v| v.as_str()),
1835                message.get("data").and_then(|v| v.as_array()),
1836                message.get("from").and_then(|v| v.as_str()),
1837            ) {
1838                // Convert data array back to bytes
1839                let data_bytes: Vec<u8> = data
1840                    .iter()
1841                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1842                    .collect();
1843
1844                // Generate message event
1845                let event = P2PEvent::Message {
1846                    topic: protocol.to_string(),
1847                    source: from.to_string(),
1848                    data: data_bytes,
1849                };
1850
1851                let _ = event_tx.send(event);
1852                debug!("Generated message event from peer: {}", peer_id);
1853            }
1854        }
1855        Err(e) => {
1856            warn!("Failed to parse received message from {}: {}", peer_id, e);
1857        }
1858    }
1859
1860    Ok(())
1861}
1862
1863// MCP removed: standalone MCP handler deleted
1864
1865/// Helper function to handle protocol message creation
1866#[allow(dead_code)]
1867fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
1868    match create_protocol_message_static(protocol, data) {
1869        Ok(msg) => Some(msg),
1870        Err(e) => {
1871            warn!("Failed to create protocol message: {}", e);
1872            None
1873        }
1874    }
1875}
1876
1877/// Helper function to handle message send result
1878#[allow(dead_code)]
1879async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
1880    match result {
1881        Ok(_) => {
1882            debug!("Message sent to peer {} via transport layer", peer_id);
1883        }
1884        Err(e) => {
1885            warn!("Failed to send message to peer {}: {}", peer_id, e);
1886        }
1887    }
1888}
1889
1890/// Helper function to check rate limit
1891#[allow(dead_code)] // Deprecated during ant-quic migration
1892fn check_rate_limit(
1893    rate_limiter: &RateLimiter,
1894    socket_addr: &std::net::SocketAddr,
1895    remote_addr: &NetworkAddress,
1896) -> Result<()> {
1897    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
1898        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
1899        e
1900    })
1901}
1902
1903/// Helper function to register a new peer
1904#[allow(dead_code)] // Deprecated during ant-quic migration
1905async fn register_new_peer(
1906    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1907    peer_id: &PeerId,
1908    remote_addr: &NetworkAddress,
1909) {
1910    let mut peers_guard = peers.write().await;
1911    let peer_info = PeerInfo {
1912        peer_id: peer_id.clone(),
1913        addresses: vec![remote_addr.to_string()],
1914        connected_at: tokio::time::Instant::now(),
1915        last_seen: tokio::time::Instant::now(),
1916        status: ConnectionStatus::Connected,
1917        protocols: vec!["p2p-chat/1.0.0".to_string()],
1918        heartbeat_count: 0,
1919    };
1920    peers_guard.insert(peer_id.clone(), peer_info);
1921}
1922
1923/// Helper function to spawn connection handler
1924#[allow(dead_code)] // Deprecated during ant-quic migration
1925fn spawn_connection_handler(
1926    connection: Box<dyn crate::transport::Connection>,
1927    peer_id: PeerId,
1928    event_tx: broadcast::Sender<P2PEvent>,
1929    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1930) {
1931    tokio::spawn(async move {
1932        handle_peer_connection(connection, peer_id, event_tx, peers).await;
1933    });
1934}
1935
1936/// Helper function to handle peer connection
1937#[allow(dead_code)] // Deprecated during ant-quic migration
1938async fn handle_peer_connection(
1939    mut connection: Box<dyn crate::transport::Connection>,
1940    peer_id: PeerId,
1941    event_tx: broadcast::Sender<P2PEvent>,
1942    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1943) {
1944    loop {
1945        match connection.receive().await {
1946            Ok(message_data) => {
1947                debug!(
1948                    "Received {} bytes from peer: {}",
1949                    message_data.len(),
1950                    peer_id
1951                );
1952
1953                // Handle the received message
1954                if let Err(e) = handle_received_message_standalone(
1955                    message_data,
1956                    &peer_id,
1957                    "unknown", // TODO: Extract protocol from message
1958                    &event_tx,
1959                )
1960                .await
1961                {
1962                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
1963                }
1964            }
1965            Err(e) => {
1966                warn!("Failed to receive message from {}: {}", peer_id, e);
1967
1968                // Check if connection is still alive
1969                if !connection.is_alive().await {
1970                    info!("Connection to {} is dead, removing peer", peer_id);
1971
1972                    // Remove dead peer
1973                    remove_peer(&peers, &peer_id).await;
1974
1975                    // Generate peer disconnected event
1976                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
1977
1978                    break; // Exit the message receiving loop
1979                }
1980
1981                // Brief pause before retrying
1982                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1983            }
1984        }
1985    }
1986}
1987
1988/// Helper function to remove a peer
1989#[allow(dead_code)] // Deprecated during ant-quic migration
1990async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
1991    let mut peers_guard = peers.write().await;
1992    peers_guard.remove(peer_id);
1993}
1994
1995/// Helper function to update peer heartbeat
1996#[allow(dead_code)]
1997async fn update_peer_heartbeat(
1998    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1999    peer_id: &PeerId,
2000) -> Result<()> {
2001    let mut peers_guard = peers.write().await;
2002    match peers_guard.get_mut(peer_id) {
2003        Some(peer_info) => {
2004            peer_info.last_seen = Instant::now();
2005            peer_info.heartbeat_count += 1;
2006            Ok(())
2007        }
2008        None => {
2009            warn!("Received heartbeat from unknown peer: {}", peer_id);
2010            Err(P2PError::Network(NetworkError::PeerNotFound(
2011                format!("Peer {} not found", peer_id).into(),
2012            )))
2013        }
2014    }
2015}
2016
2017/// Helper function to get resource metrics
2018#[allow(dead_code)]
2019async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2020    if let Some(manager) = resource_manager {
2021        let metrics = manager.get_metrics().await;
2022        (metrics.memory_used, metrics.cpu_usage)
2023    } else {
2024        (0, 0.0)
2025    }
2026}
2027
2028#[cfg(test)]
2029mod tests {
2030    use super::*;
2031    // MCP removed from tests
2032    use std::time::Duration;
2033    use tokio::time::timeout;
2034
2035    // Test tool handler for network tests
2036
2037    // MCP removed
2038
2039    /// Helper function to create a test node configuration
2040    fn create_test_node_config() -> NodeConfig {
2041        NodeConfig {
2042            peer_id: Some("test_peer_123".to_string()),
2043            listen_addrs: vec![
2044                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2045                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2046            ],
2047            listen_addr: std::net::SocketAddr::new(
2048                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2049                0,
2050            ),
2051            bootstrap_peers: vec![],
2052            bootstrap_peers_str: vec![],
2053            enable_ipv6: true,
2054
2055            connection_timeout: Duration::from_secs(10),
2056            keep_alive_interval: Duration::from_secs(30),
2057            max_connections: 100,
2058            max_incoming_connections: 50,
2059            dht_config: DHTConfig::default(),
2060            security_config: SecurityConfig::default(),
2061            production_config: None,
2062            bootstrap_cache_config: None,
2063            // identity_config: None,
2064        }
2065    }
2066
2067    /// Helper function to create a test tool
2068    // MCP removed: test tool helper deleted
2069
2070    #[tokio::test]
2071    async fn test_node_config_default() {
2072        let config = NodeConfig::default();
2073
2074        assert!(config.peer_id.is_none());
2075        assert_eq!(config.listen_addrs.len(), 2);
2076        assert!(config.enable_ipv6);
2077        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2078        assert_eq!(config.max_incoming_connections, 100);
2079        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2080    }
2081
2082    #[tokio::test]
2083    async fn test_dht_config_default() {
2084        let config = DHTConfig::default();
2085
2086        assert_eq!(config.k_value, 20);
2087        assert_eq!(config.alpha_value, 5);
2088        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2089        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2090    }
2091
2092    #[tokio::test]
2093    async fn test_security_config_default() {
2094        let config = SecurityConfig::default();
2095
2096        assert!(config.enable_noise);
2097        assert!(config.enable_tls);
2098        assert_eq!(config.trust_level, TrustLevel::Basic);
2099    }
2100
2101    #[test]
2102    fn test_trust_level_variants() {
2103        // Test that all trust level variants can be created
2104        let _none = TrustLevel::None;
2105        let _basic = TrustLevel::Basic;
2106        let _full = TrustLevel::Full;
2107
2108        // Test equality
2109        assert_eq!(TrustLevel::None, TrustLevel::None);
2110        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2111        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2112        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2113    }
2114
2115    #[test]
2116    fn test_connection_status_variants() {
2117        let connecting = ConnectionStatus::Connecting;
2118        let connected = ConnectionStatus::Connected;
2119        let disconnecting = ConnectionStatus::Disconnecting;
2120        let disconnected = ConnectionStatus::Disconnected;
2121        let failed = ConnectionStatus::Failed("test error".to_string());
2122
2123        assert_eq!(connecting, ConnectionStatus::Connecting);
2124        assert_eq!(connected, ConnectionStatus::Connected);
2125        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2126        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2127        assert_ne!(connecting, connected);
2128
2129        if let ConnectionStatus::Failed(msg) = failed {
2130            assert_eq!(msg, "test error");
2131        } else {
2132            panic!("Expected Failed status");
2133        }
2134    }
2135
2136    #[tokio::test]
2137    async fn test_node_creation() -> Result<()> {
2138        let config = create_test_node_config();
2139        let node = P2PNode::new(config).await?;
2140
2141        assert_eq!(node.peer_id(), "test_peer_123");
2142        assert!(!node.is_running().await);
2143        assert_eq!(node.peer_count().await, 0);
2144        assert!(node.connected_peers().await.is_empty());
2145
2146        Ok(())
2147    }
2148
2149    #[tokio::test]
2150    async fn test_node_creation_without_peer_id() -> Result<()> {
2151        let mut config = create_test_node_config();
2152        config.peer_id = None;
2153
2154        let node = P2PNode::new(config).await?;
2155
2156        // Should have generated a peer ID
2157        assert!(node.peer_id().starts_with("peer_"));
2158        assert!(!node.is_running().await);
2159
2160        Ok(())
2161    }
2162
2163    #[tokio::test]
2164    async fn test_node_lifecycle() -> Result<()> {
2165        let config = create_test_node_config();
2166        let node = P2PNode::new(config).await?;
2167
2168        // Initially not running
2169        assert!(!node.is_running().await);
2170
2171        // Start the node
2172        node.start().await?;
2173        assert!(node.is_running().await);
2174
2175        // Check listen addresses were set (at least one)
2176        let listen_addrs = node.listen_addrs().await;
2177        assert!(
2178            !listen_addrs.is_empty(),
2179            "Expected at least one listening address"
2180        );
2181
2182        // Stop the node
2183        node.stop().await?;
2184        assert!(!node.is_running().await);
2185
2186        Ok(())
2187    }
2188
2189    #[tokio::test]
2190    async fn test_peer_connection() -> Result<()> {
2191        let config = create_test_node_config();
2192        let node = P2PNode::new(config).await?;
2193
2194        let peer_addr = "127.0.0.1:0";
2195
2196        // Connect to a peer
2197        let peer_id = node.connect_peer(peer_addr).await?;
2198        assert!(peer_id.starts_with("peer_from_"));
2199
2200        // Check peer count
2201        assert_eq!(node.peer_count().await, 1);
2202
2203        // Check connected peers
2204        let connected_peers = node.connected_peers().await;
2205        assert_eq!(connected_peers.len(), 1);
2206        assert_eq!(connected_peers[0], peer_id);
2207
2208        // Get peer info
2209        let peer_info = node.peer_info(&peer_id).await;
2210        assert!(peer_info.is_some());
2211        let info = peer_info.expect("Peer info should exist after adding peer");
2212        assert_eq!(info.peer_id, peer_id);
2213        assert_eq!(info.status, ConnectionStatus::Connected);
2214        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2215
2216        // Disconnect from peer
2217        node.disconnect_peer(&peer_id).await?;
2218        assert_eq!(node.peer_count().await, 0);
2219
2220        Ok(())
2221    }
2222
2223    #[tokio::test]
2224    async fn test_event_subscription() -> Result<()> {
2225        let config = create_test_node_config();
2226        let node = P2PNode::new(config).await?;
2227
2228        let mut events = node.subscribe_events();
2229        let peer_addr = "127.0.0.1:0";
2230
2231        // Connect to a peer (this should emit an event)
2232        let peer_id = node.connect_peer(peer_addr).await?;
2233
2234        // Check for PeerConnected event
2235        let event = timeout(Duration::from_millis(100), events.recv()).await;
2236        assert!(event.is_ok());
2237
2238        let event_result = event
2239            .expect("Should receive event")
2240            .expect("Event should not be error");
2241        match event_result {
2242            P2PEvent::PeerConnected(event_peer_id) => {
2243                assert_eq!(event_peer_id, peer_id);
2244            }
2245            _ => panic!("Expected PeerConnected event"),
2246        }
2247
2248        // Disconnect from peer (this should emit another event)
2249        node.disconnect_peer(&peer_id).await?;
2250
2251        // Check for PeerDisconnected event
2252        let event = timeout(Duration::from_millis(100), events.recv()).await;
2253        assert!(event.is_ok());
2254
2255        let event_result = event
2256            .expect("Should receive event")
2257            .expect("Event should not be error");
2258        match event_result {
2259            P2PEvent::PeerDisconnected(event_peer_id) => {
2260                assert_eq!(event_peer_id, peer_id);
2261            }
2262            _ => panic!("Expected PeerDisconnected event"),
2263        }
2264
2265        Ok(())
2266    }
2267
2268    #[tokio::test]
2269    async fn test_message_sending() -> Result<()> {
2270        // Create two nodes
2271        let mut config1 = create_test_node_config();
2272        config1.listen_addr =
2273            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2274        let node1 = P2PNode::new(config1).await?;
2275        node1.start().await?;
2276
2277        let mut config2 = create_test_node_config();
2278        config2.listen_addr =
2279            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2280        let node2 = P2PNode::new(config2).await?;
2281        node2.start().await?;
2282
2283        // Wait a bit for nodes to start listening
2284        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2285
2286        // Get actual listening address of node2
2287        let node2_addr = node2.local_addr().ok_or_else(|| {
2288            P2PError::Network(crate::error::NetworkError::ProtocolError(
2289                "No listening address".to_string().into(),
2290            ))
2291        })?;
2292
2293        // Connect node1 to node2
2294        let peer_id = node1.connect_peer(&node2_addr).await?;
2295
2296        // Wait a bit for connection to establish
2297        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2298
2299        // Send a message
2300        let message_data = b"Hello, peer!".to_vec();
2301        let result = node1
2302            .send_message(&peer_id, "test-protocol", message_data)
2303            .await;
2304        // For now, we'll just check that we don't get a "not connected" error
2305        // The actual send might fail due to no handler on the other side
2306        if let Err(e) = &result {
2307            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2308        }
2309
2310        // Try to send to non-existent peer
2311        let non_existent_peer = "non_existent_peer".to_string();
2312        let result = node1
2313            .send_message(&non_existent_peer, "test-protocol", vec![])
2314            .await;
2315        assert!(result.is_err(), "Sending to non-existent peer should fail");
2316
2317        Ok(())
2318    }
2319
2320    #[tokio::test]
2321    async fn test_remote_mcp_operations() -> Result<()> {
2322        let config = create_test_node_config();
2323        let node = P2PNode::new(config).await?;
2324
2325        // MCP removed; test reduced to simple start/stop
2326        node.start().await?;
2327        node.stop().await?;
2328        Ok(())
2329    }
2330
2331    #[tokio::test]
2332    async fn test_health_check() -> Result<()> {
2333        let config = create_test_node_config();
2334        let node = P2PNode::new(config).await?;
2335
2336        // Health check should pass with no connections
2337        let result = node.health_check().await;
2338        assert!(result.is_ok());
2339
2340        // Note: We're not actually connecting to real peers here
2341        // since that would require running bootstrap nodes.
2342        // The health check should still pass with no connections.
2343
2344        Ok(())
2345    }
2346
2347    #[tokio::test]
2348    async fn test_node_uptime() -> Result<()> {
2349        let config = create_test_node_config();
2350        let node = P2PNode::new(config).await?;
2351
2352        let uptime1 = node.uptime();
2353        assert!(uptime1 >= Duration::from_secs(0));
2354
2355        // Wait a bit
2356        tokio::time::sleep(Duration::from_millis(10)).await;
2357
2358        let uptime2 = node.uptime();
2359        assert!(uptime2 > uptime1);
2360
2361        Ok(())
2362    }
2363
2364    #[tokio::test]
2365    async fn test_node_config_access() -> Result<()> {
2366        let config = create_test_node_config();
2367        let expected_peer_id = config.peer_id.clone();
2368        let node = P2PNode::new(config).await?;
2369
2370        let node_config = node.config();
2371        assert_eq!(node_config.peer_id, expected_peer_id);
2372        assert_eq!(node_config.max_connections, 100);
2373        // MCP removed
2374
2375        Ok(())
2376    }
2377
2378    #[tokio::test]
2379    async fn test_mcp_server_access() -> Result<()> {
2380        let config = create_test_node_config();
2381        let _node = P2PNode::new(config).await?;
2382
2383        // MCP removed
2384        Ok(())
2385    }
2386
2387    #[tokio::test]
2388    async fn test_dht_access() -> Result<()> {
2389        let config = create_test_node_config();
2390        let node = P2PNode::new(config).await?;
2391
2392        // Should have DHT
2393        assert!(node.dht().is_some());
2394
2395        Ok(())
2396    }
2397
2398    #[tokio::test]
2399    async fn test_node_builder() -> Result<()> {
2400        // Create a config using the builder but don't actually build a real node
2401        let builder = P2PNode::builder()
2402            .with_peer_id("builder_test_peer".to_string())
2403            .listen_on("/ip4/127.0.0.1/tcp/0")
2404            .listen_on("/ip6/::1/tcp/0")
2405            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
2406            .with_ipv6(true)
2407            .with_connection_timeout(Duration::from_secs(15))
2408            .with_max_connections(200);
2409
2410        // Test the configuration that was built
2411        let config = builder.config;
2412        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2413        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
2414        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
2415        assert!(config.enable_ipv6);
2416        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2417        assert_eq!(config.max_connections, 200);
2418
2419        Ok(())
2420    }
2421
2422    #[tokio::test]
2423    async fn test_bootstrap_peers() -> Result<()> {
2424        let mut config = create_test_node_config();
2425        config.bootstrap_peers = vec![
2426            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2427            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2428        ];
2429
2430        let node = P2PNode::new(config).await?;
2431
2432        // Start node (which attempts to connect to bootstrap peers)
2433        node.start().await?;
2434
2435        // In a test environment, bootstrap peers may not be available
2436        // The test verifies the node starts correctly with bootstrap configuration
2437        let peer_count = node.peer_count().await;
2438        assert!(
2439            peer_count <= 2,
2440            "Peer count should not exceed bootstrap peer count"
2441        );
2442
2443        node.stop().await?;
2444        Ok(())
2445    }
2446
2447    #[tokio::test]
2448    async fn test_production_mode_disabled() -> Result<()> {
2449        let config = create_test_node_config();
2450        let node = P2PNode::new(config).await?;
2451
2452        assert!(!node.is_production_mode());
2453        assert!(node.production_config().is_none());
2454
2455        // Resource metrics should fail when production mode is disabled
2456        let result = node.resource_metrics().await;
2457        assert!(result.is_err());
2458        assert!(result.unwrap_err().to_string().contains("not enabled"));
2459
2460        Ok(())
2461    }
2462
2463    #[tokio::test]
2464    async fn test_network_event_variants() {
2465        // Test that all network event variants can be created
2466        let peer_id = "test_peer".to_string();
2467        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2468
2469        let _peer_connected = NetworkEvent::PeerConnected {
2470            peer_id: peer_id.clone(),
2471            addresses: vec![address.clone()],
2472        };
2473
2474        let _peer_disconnected = NetworkEvent::PeerDisconnected {
2475            peer_id: peer_id.clone(),
2476            reason: "test disconnect".to_string(),
2477        };
2478
2479        let _message_received = NetworkEvent::MessageReceived {
2480            peer_id: peer_id.clone(),
2481            protocol: "test-protocol".to_string(),
2482            data: vec![1, 2, 3],
2483        };
2484
2485        let _connection_failed = NetworkEvent::ConnectionFailed {
2486            peer_id: Some(peer_id.clone()),
2487            address: address.clone(),
2488            error: "connection refused".to_string(),
2489        };
2490
2491        let _dht_stored = NetworkEvent::DHTRecordStored {
2492            key: vec![1, 2, 3],
2493            value: vec![4, 5, 6],
2494        };
2495
2496        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2497            key: vec![1, 2, 3],
2498            value: Some(vec![4, 5, 6]),
2499        };
2500    }
2501
2502    #[tokio::test]
2503    async fn test_peer_info_structure() {
2504        let peer_info = PeerInfo {
2505            peer_id: "test_peer".to_string(),
2506            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2507            connected_at: Instant::now(),
2508            last_seen: Instant::now(),
2509            status: ConnectionStatus::Connected,
2510            protocols: vec!["test-protocol".to_string()],
2511            heartbeat_count: 0,
2512        };
2513
2514        assert_eq!(peer_info.peer_id, "test_peer");
2515        assert_eq!(peer_info.addresses.len(), 1);
2516        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2517        assert_eq!(peer_info.protocols.len(), 1);
2518    }
2519
2520    #[tokio::test]
2521    async fn test_serialization() -> Result<()> {
2522        // Test that configs can be serialized/deserialized
2523        let config = create_test_node_config();
2524        let serialized = serde_json::to_string(&config)?;
2525        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2526
2527        assert_eq!(config.peer_id, deserialized.peer_id);
2528        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2529        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2530
2531        Ok(())
2532    }
2533}