saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] // Temporarily unused during migration
27use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::sync::{RwLock, broadcast};
37use tokio::time::Instant;
38use tracing::{debug, info, warn};
39
40/// Configuration for a P2P node
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct NodeConfig {
43    /// Local peer ID for this node
44    pub peer_id: Option<PeerId>,
45
46    /// Addresses to listen on for incoming connections
47    pub listen_addrs: Vec<std::net::SocketAddr>,
48
49    /// Primary listen address (for compatibility)
50    pub listen_addr: std::net::SocketAddr,
51
52    /// Bootstrap peers to connect to on startup (legacy)
53    pub bootstrap_peers: Vec<std::net::SocketAddr>,
54
55    /// Bootstrap peers as strings (for integration tests)
56    pub bootstrap_peers_str: Vec<String>,
57
58    /// Enable IPv6 support
59    pub enable_ipv6: bool,
60
61    // MCP removed; will be redesigned later
62    /// Connection timeout duration
63    pub connection_timeout: Duration,
64
65    /// Keep-alive interval for connections
66    pub keep_alive_interval: Duration,
67
68    /// Maximum number of concurrent connections
69    pub max_connections: usize,
70
71    /// Maximum number of incoming connections
72    pub max_incoming_connections: usize,
73
74    /// DHT configuration
75    pub dht_config: DHTConfig,
76
77    /// Security configuration
78    pub security_config: SecurityConfig,
79
80    /// Production hardening configuration
81    pub production_config: Option<ProductionConfig>,
82
83    /// Bootstrap cache configuration
84    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
85}
86
87/// DHT-specific configuration
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DHTConfig {
90    /// Kademlia K parameter (bucket size)
91    pub k_value: usize,
92
93    /// Kademlia alpha parameter (parallelism)
94    pub alpha_value: usize,
95
96    /// DHT record TTL
97    pub record_ttl: Duration,
98
99    /// DHT refresh interval
100    pub refresh_interval: Duration,
101}
102
103/// Security configuration
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SecurityConfig {
106    /// Enable noise protocol for encryption
107    pub enable_noise: bool,
108
109    /// Enable TLS for secure transport
110    pub enable_tls: bool,
111
112    /// Trust level for peer verification
113    pub trust_level: TrustLevel,
114}
115
116/// Trust level for peer verification
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub enum TrustLevel {
119    /// No verification required
120    None,
121    /// Basic peer ID verification
122    Basic,
123    /// Full cryptographic verification
124    Full,
125}
126
127impl NodeConfig {
128    /// Create a new NodeConfig with default values
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if default addresses cannot be parsed
133    pub fn new() -> Result<Self> {
134        // Load config and use its defaults
135        let config = Config::default();
136
137        // Parse the default listen address
138        let listen_addr = config.listen_socket_addr()?;
139
140        // Create listen addresses based on config
141        let mut listen_addrs = vec![];
142
143        // Add IPv6 address if enabled
144        if config.network.ipv6_enabled {
145            let ipv6_addr = std::net::SocketAddr::new(
146                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
147                listen_addr.port(),
148            );
149            listen_addrs.push(ipv6_addr);
150        }
151
152        // Always add IPv4
153        let ipv4_addr = std::net::SocketAddr::new(
154            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
155            listen_addr.port(),
156        );
157        listen_addrs.push(ipv4_addr);
158
159        Ok(Self {
160            peer_id: None,
161            listen_addrs,
162            listen_addr,
163            bootstrap_peers: Vec::new(),
164            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
165            enable_ipv6: config.network.ipv6_enabled,
166
167            connection_timeout: Duration::from_secs(config.network.connection_timeout),
168            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
169            max_connections: config.network.max_connections,
170            max_incoming_connections: config.security.connection_limit as usize,
171            dht_config: DHTConfig::default(),
172            security_config: SecurityConfig::default(),
173            production_config: None,
174            bootstrap_cache_config: None,
175            // identity_config: None,
176        })
177    }
178}
179
180impl Default for NodeConfig {
181    fn default() -> Self {
182        // Use config defaults for network settings
183        let config = Config::default();
184
185        // Parse the default listen address - use safe fallback if parsing fails
186        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
187            std::net::SocketAddr::new(
188                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
189                9000,
190            )
191        });
192
193        Self {
194            peer_id: None,
195            listen_addrs: vec![
196                std::net::SocketAddr::new(
197                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
198                    listen_addr.port(),
199                ),
200                std::net::SocketAddr::new(
201                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
202                    listen_addr.port(),
203                ),
204            ],
205            listen_addr,
206            bootstrap_peers: Vec::new(),
207            bootstrap_peers_str: Vec::new(),
208            enable_ipv6: config.network.ipv6_enabled,
209
210            connection_timeout: Duration::from_secs(config.network.connection_timeout),
211            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
212            max_connections: config.network.max_connections,
213            max_incoming_connections: config.security.connection_limit as usize,
214            dht_config: DHTConfig::default(),
215            security_config: SecurityConfig::default(),
216            production_config: None, // Use default production config if enabled
217            bootstrap_cache_config: None,
218            // identity_config: None, // Use default identity config if enabled
219        }
220    }
221}
222
223impl NodeConfig {
224    /// Create NodeConfig from Config
225    pub fn from_config(config: &Config) -> Result<Self> {
226        let listen_addr = config.listen_socket_addr()?;
227        let bootstrap_addrs = config.bootstrap_addrs()?;
228
229        let mut node_config = Self {
230            peer_id: None,
231            listen_addrs: vec![listen_addr],
232            listen_addr,
233            bootstrap_peers: bootstrap_addrs
234                .iter()
235                .map(|addr| addr.socket_addr())
236                .collect(),
237            bootstrap_peers_str: config
238                .network
239                .bootstrap_nodes
240                .iter()
241                .map(|addr| addr.to_string())
242                .collect(),
243            enable_ipv6: config.network.ipv6_enabled,
244
245            connection_timeout: Duration::from_secs(config.network.connection_timeout),
246            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
247            max_connections: config.network.max_connections,
248            max_incoming_connections: config.security.connection_limit as usize,
249            dht_config: DHTConfig {
250                k_value: 20,
251                alpha_value: 3,
252                record_ttl: Duration::from_secs(3600),
253                refresh_interval: Duration::from_secs(900),
254            },
255            security_config: SecurityConfig {
256                enable_noise: true,
257                enable_tls: true,
258                trust_level: TrustLevel::Basic,
259            },
260            production_config: Some(ProductionConfig {
261                max_connections: config.network.max_connections,
262                max_memory_bytes: 0,  // unlimited
263                max_bandwidth_bps: 0, // unlimited
264                connection_timeout: Duration::from_secs(config.network.connection_timeout),
265                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266                health_check_interval: Duration::from_secs(30),
267                metrics_interval: Duration::from_secs(60),
268                enable_performance_tracking: true,
269                enable_auto_cleanup: true,
270                shutdown_timeout: Duration::from_secs(30),
271                rate_limits: crate::production::RateLimitConfig::default(),
272            }),
273            bootstrap_cache_config: None,
274            // identity_config: Some(IdentityManagerConfig {
275            //     cache_ttl: Duration::from_secs(3600),
276            //     challenge_timeout: Duration::from_secs(30),
277            // }),
278        };
279
280        // Add IPv6 listen address if enabled
281        if config.network.ipv6_enabled {
282            node_config.listen_addrs.push(std::net::SocketAddr::new(
283                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
284                listen_addr.port(),
285            ));
286        }
287
288        Ok(node_config)
289    }
290
291    /// Try to build a NodeConfig from a listen address string
292    pub fn with_listen_addr(addr: &str) -> Result<Self> {
293        let listen_addr: std::net::SocketAddr = addr
294            .parse()
295            .map_err(|e: std::net::AddrParseError| {
296                NetworkError::InvalidAddress(e.to_string().into())
297            })
298            .map_err(P2PError::Network)?;
299        let cfg = NodeConfig {
300            listen_addr,
301            listen_addrs: vec![listen_addr],
302            ..Default::default()
303        };
304        Ok(cfg)
305    }
306}
307
308impl Default for DHTConfig {
309    fn default() -> Self {
310        Self {
311            k_value: 20,
312            alpha_value: 5,
313            record_ttl: Duration::from_secs(3600), // 1 hour
314            refresh_interval: Duration::from_secs(600), // 10 minutes
315        }
316    }
317}
318
319impl Default for SecurityConfig {
320    fn default() -> Self {
321        Self {
322            enable_noise: true,
323            enable_tls: true,
324            trust_level: TrustLevel::Basic,
325        }
326    }
327}
328
329/// Information about a connected peer
330#[derive(Debug, Clone)]
331pub struct PeerInfo {
332    /// Peer identifier
333    pub peer_id: PeerId,
334
335    /// Peer's addresses
336    pub addresses: Vec<String>,
337
338    /// Connection timestamp
339    pub connected_at: Instant,
340
341    /// Last seen timestamp
342    pub last_seen: Instant,
343
344    /// Connection status
345    pub status: ConnectionStatus,
346
347    /// Supported protocols
348    pub protocols: Vec<String>,
349
350    /// Number of heartbeats received
351    pub heartbeat_count: u64,
352}
353
354/// Connection status for a peer
355#[derive(Debug, Clone, PartialEq)]
356pub enum ConnectionStatus {
357    /// Connection is being established
358    Connecting,
359    /// Connection is established and active
360    Connected,
361    /// Connection is being closed
362    Disconnecting,
363    /// Connection is closed
364    Disconnected,
365    /// Connection failed
366    Failed(String),
367}
368
369/// Network events that can occur
370#[derive(Debug, Clone)]
371pub enum NetworkEvent {
372    /// A new peer has connected
373    PeerConnected {
374        /// The identifier of the newly connected peer
375        peer_id: PeerId,
376        /// The network addresses where the peer can be reached
377        addresses: Vec<String>,
378    },
379
380    /// A peer has disconnected
381    PeerDisconnected {
382        /// The identifier of the disconnected peer
383        peer_id: PeerId,
384        /// The reason for the disconnection
385        reason: String,
386    },
387
388    /// A message was received from a peer
389    MessageReceived {
390        /// The identifier of the sending peer
391        peer_id: PeerId,
392        /// The protocol used for the message
393        protocol: String,
394        /// The raw message data
395        data: Vec<u8>,
396    },
397
398    /// A connection attempt failed
399    ConnectionFailed {
400        /// The identifier of the peer (if known)
401        peer_id: Option<PeerId>,
402        /// The address where connection was attempted
403        address: String,
404        /// The error message describing the failure
405        error: String,
406    },
407
408    /// DHT record was stored
409    DHTRecordStored {
410        /// The DHT key where the record was stored
411        key: Vec<u8>,
412        /// The value that was stored
413        value: Vec<u8>,
414    },
415
416    /// DHT record was retrieved
417    DHTRecordRetrieved {
418        /// The DHT key that was queried
419        key: Vec<u8>,
420        /// The retrieved value, if found
421        value: Option<Vec<u8>>,
422    },
423}
424
425/// Network events that can occur in the P2P system
426///
427/// Events are broadcast to all listeners and provide real-time
428/// notifications of network state changes and message arrivals.
429#[derive(Debug, Clone)]
430pub enum P2PEvent {
431    /// Message received from a peer on a specific topic
432    Message {
433        /// Topic or channel the message was sent on
434        topic: String,
435        /// Peer ID of the message sender
436        source: PeerId,
437        /// Raw message data payload
438        data: Vec<u8>,
439    },
440    /// A new peer has connected to the network
441    PeerConnected(PeerId),
442    /// A peer has disconnected from the network
443    PeerDisconnected(PeerId),
444}
445
446/// Main P2P node structure
447/// Main P2P network node that manages connections, routing, and communication
448///
449/// This struct represents a complete P2P network participant that can:
450/// - Connect to other peers via QUIC transport
451/// - Participate in distributed hash table (DHT) operations
452/// - Send and receive messages through various protocols
453/// - Handle network events and peer lifecycle
454/// - Provide MCP (Model Context Protocol) services
455pub struct P2PNode {
456    /// Node configuration
457    config: NodeConfig,
458
459    /// Our peer ID
460    peer_id: PeerId,
461
462    /// Connected peers
463    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
464
465    /// Network event broadcaster
466    event_tx: broadcast::Sender<P2PEvent>,
467
468    /// Listen addresses
469    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
470
471    /// Node start time
472    start_time: Instant,
473
474    /// Running state
475    running: RwLock<bool>,
476
477    /// DHT instance (optional)
478    dht: Option<Arc<RwLock<DHT>>>,
479
480    /// Production resource manager (optional)
481    resource_manager: Option<Arc<ResourceManager>>,
482
483    /// Bootstrap cache manager for peer discovery
484    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
485
486    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
487    dual_node: Arc<DualStackNetworkNode>,
488
489    /// Rate limiter for connection and request throttling
490    #[allow(dead_code)]
491    rate_limiter: Arc<RateLimiter>,
492}
493
494impl P2PNode {
495    /// Minimal constructor for tests that avoids real networking
496    pub fn new_for_tests() -> Result<Self> {
497        let (event_tx, _) = broadcast::channel(16);
498        Ok(Self {
499            config: NodeConfig::default(),
500            peer_id: "test_peer".to_string(),
501            peers: Arc::new(RwLock::new(HashMap::new())),
502            event_tx,
503            listen_addrs: RwLock::new(Vec::new()),
504            start_time: Instant::now(),
505            running: RwLock::new(false),
506            dht: None,
507            resource_manager: None,
508            bootstrap_manager: None,
509            dual_node: {
510                // Bind dual-stack nodes on ephemeral ports for tests
511                let v6: Option<std::net::SocketAddr> = "[::1]:0"
512                    .parse()
513                    .ok()
514                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
515                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
516                let handle = tokio::runtime::Handle::current();
517                let dual_attempt = handle.block_on(
518                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
519                );
520                let dual = match dual_attempt {
521                    Ok(d) => d,
522                    Err(_e1) => {
523                        // Fallback to IPv4-only ephemeral bind
524                        let fallback = handle.block_on(
525                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
526                                None,
527                                "127.0.0.1:0".parse().ok(),
528                            ),
529                        );
530                        match fallback {
531                            Ok(d) => d,
532                            Err(e2) => {
533                                return Err(P2PError::Network(NetworkError::BindError(
534                                    format!("Failed to create dual-stack network node: {}", e2)
535                                        .into(),
536                                )));
537                            }
538                        }
539                    }
540                };
541                Arc::new(dual)
542            },
543            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
544                max_requests: 100,
545                burst_size: 100,
546                window: std::time::Duration::from_secs(1),
547                ..Default::default()
548            })),
549        })
550    }
551    /// Create a new P2P node with the given configuration
552    pub async fn new(config: NodeConfig) -> Result<Self> {
553        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
554            // Generate a random peer ID for now
555            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
556        });
557
558        let (event_tx, _) = broadcast::channel(1000);
559
560        // Initialize and register a TrustWeightedKademlia DHT for the global API
561        // Use a deterministic local NodeId derived from the peer_id
562        {
563            use blake3::Hasher;
564            let mut hasher = Hasher::new();
565            hasher.update(peer_id.as_bytes());
566            let digest = hasher.finalize();
567            let mut nid = [0u8; 32];
568            nid.copy_from_slice(digest.as_bytes());
569            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
570                crate::identity::node_identity::NodeId::from_bytes(nid),
571            ));
572            // TODO: Update to use new clean API
573            // let _ = crate::api::set_dht_instance(twdht);
574        }
575
576        // Initialize DHT if needed
577        let dht = if true {
578            // Always enable DHT for now
579            let _dht_config = crate::dht::DHTConfig {
580                replication_factor: config.dht_config.k_value,
581                bucket_size: config.dht_config.k_value,
582                alpha: config.dht_config.alpha_value,
583                record_ttl: config.dht_config.record_ttl,
584                bucket_refresh_interval: config.dht_config.refresh_interval,
585                republish_interval: config.dht_config.refresh_interval,
586                max_distance: 160, // 160 bits for SHA-256
587            };
588            // Convert peer_id String to NodeId
589            let peer_bytes = peer_id.as_bytes();
590            let mut node_id_bytes = [0u8; 32];
591            let len = peer_bytes.len().min(32);
592            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
593            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
594            let dht_instance = DHT::new(node_id).map_err(|e| {
595                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
596                    e.to_string().into(),
597                ))
598            })?;
599            Some(Arc::new(RwLock::new(dht_instance)))
600        } else {
601            None
602        };
603
604        // MCP removed
605
606        // Initialize production resource manager if configured
607        let resource_manager = config
608            .production_config
609            .clone()
610            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
611
612        // Initialize bootstrap cache manager
613        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
614            match BootstrapManager::with_config(cache_config.clone()).await {
615                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
616                Err(e) => {
617                    warn!(
618                        "Failed to initialize bootstrap manager: {}, continuing without cache",
619                        e
620                    );
621                    None
622                }
623            }
624        } else {
625            match BootstrapManager::new().await {
626                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
627                Err(e) => {
628                    warn!(
629                        "Failed to initialize bootstrap manager: {}, continuing without cache",
630                        e
631                    );
632                    None
633                }
634            }
635        };
636
637        // Initialize dual-stack ant-quic nodes
638        let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
639            let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
640            let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
641            (v6_addr, v4_addr)
642        } else {
643            // Defaults: always listen on IPv4; IPv6 if enabled
644            let v4_addr = Some(std::net::SocketAddr::new(
645                std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
646                config.listen_addr.port(),
647            ));
648            let v6_addr = if config.enable_ipv6 {
649                Some(std::net::SocketAddr::new(
650                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
651                    config.listen_addr.port(),
652                ))
653            } else {
654                None
655            };
656            (v6_addr, v4_addr)
657        };
658
659        let dual_node = Arc::new(
660            DualStackNetworkNode::new(v6_opt, v4_opt)
661                .await
662                .map_err(|e| {
663                    P2PError::Transport(crate::error::TransportError::SetupFailed(
664                        format!("Failed to create dual-stack network nodes: {}", e).into(),
665                    ))
666                })?,
667        );
668
669        // Initialize rate limiter with default config
670        let rate_limiter = Arc::new(RateLimiter::new(
671            crate::validation::RateLimitConfig::default(),
672        ));
673
674        let node = Self {
675            config,
676            peer_id,
677            peers: Arc::new(RwLock::new(HashMap::new())),
678            event_tx,
679            listen_addrs: RwLock::new(Vec::new()),
680            start_time: Instant::now(),
681            running: RwLock::new(false),
682            dht,
683            resource_manager,
684            bootstrap_manager,
685            dual_node,
686            rate_limiter,
687        };
688        info!("Created P2P node with peer ID: {}", node.peer_id);
689
690        Ok(node)
691    }
692
693    /// Create a new node builder
694    pub fn builder() -> NodeBuilder {
695        NodeBuilder::new()
696    }
697
698    /// Get the peer ID of this node
699    pub fn peer_id(&self) -> &PeerId {
700        &self.peer_id
701    }
702
703    pub fn local_addr(&self) -> Option<String> {
704        self.listen_addrs
705            .try_read()
706            .ok()
707            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
708    }
709
710    pub async fn subscribe(&self, topic: &str) -> Result<()> {
711        // In a real implementation, this would register the topic with the pubsub mechanism.
712        // For now, we just log it.
713        info!("Subscribed to topic: {}", topic);
714        Ok(())
715    }
716
717    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
718        info!(
719            "Publishing message to topic: {} ({} bytes)",
720            topic,
721            data.len()
722        );
723
724        // Get list of connected peers
725        let peer_list: Vec<PeerId> = {
726            let peers_guard = self.peers.read().await;
727            peers_guard.keys().cloned().collect()
728        };
729
730        if peer_list.is_empty() {
731            debug!("No peers connected, message will only be sent to local subscribers");
732        } else {
733            // Send message to all connected peers
734            let mut send_count = 0;
735            for peer_id in &peer_list {
736                match self.send_message(peer_id, topic, data.to_vec()).await {
737                    Ok(_) => {
738                        send_count += 1;
739                        debug!("Sent message to peer: {}", peer_id);
740                    }
741                    Err(e) => {
742                        warn!("Failed to send message to peer {}: {}", peer_id, e);
743                    }
744                }
745            }
746            info!(
747                "Published message to {}/{} connected peers",
748                send_count,
749                peer_list.len()
750            );
751        }
752
753        // Also send to local subscribers (for local echo and testing)
754        let event = P2PEvent::Message {
755            topic: topic.to_string(),
756            source: self.peer_id.clone(),
757            data: data.to_vec(),
758        };
759        let _ = self.event_tx.send(event);
760
761        Ok(())
762    }
763
764    /// Get the node configuration
765    pub fn config(&self) -> &NodeConfig {
766        &self.config
767    }
768
769    /// Start the P2P node
770    pub async fn start(&self) -> Result<()> {
771        info!("Starting P2P node...");
772
773        // Start production resource manager if configured
774        if let Some(ref resource_manager) = self.resource_manager {
775            resource_manager.start().await.map_err(|e| {
776                P2PError::Network(crate::error::NetworkError::ProtocolError(
777                    format!("Failed to start resource manager: {e}").into(),
778                ))
779            })?;
780            info!("Production resource manager started");
781        }
782
783        // Start bootstrap manager background tasks
784        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
785            let mut manager = bootstrap_manager.write().await;
786            manager.start_background_tasks().await.map_err(|e| {
787                P2PError::Network(crate::error::NetworkError::ProtocolError(
788                    format!("Failed to start bootstrap manager: {e}").into(),
789                ))
790            })?;
791            info!("Bootstrap cache manager started");
792        }
793
794        // Set running state
795        *self.running.write().await = true;
796
797        // Start listening on configured addresses using transport layer
798        self.start_network_listeners().await?;
799
800        // Log current listen addresses
801        let listen_addrs = self.listen_addrs.read().await;
802        info!("P2P node started on addresses: {:?}", *listen_addrs);
803
804        // MCP removed
805
806        // Start message receiving system
807        self.start_message_receiving_system().await?;
808
809        // Connect to bootstrap peers
810        self.connect_bootstrap_peers().await?;
811
812        Ok(())
813    }
814
815    /// Start network listeners on configured addresses
816    async fn start_network_listeners(&self) -> Result<()> {
817        info!("Starting dual-stack listeners (ant-quic)...");
818        // Update our listen_addrs from the dual node bindings
819        let addrs = self.dual_node.local_addrs();
820        {
821            let mut la = self.listen_addrs.write().await;
822            *la = addrs.clone();
823        }
824
825        // Spawn a background accept loop that handles incoming connections from either stack
826        let event_tx = self.event_tx.clone();
827        let peers = self.peers.clone();
828        let rate_limiter = self.rate_limiter.clone();
829        let dual = self.dual_node.clone();
830        tokio::spawn(async move {
831            loop {
832                match dual.accept_any().await {
833                    Ok((ant_peer_id, remote_sock)) => {
834                        let peer_id =
835                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
836                        let remote_addr = NetworkAddress::from(remote_sock);
837                        // Optional: basic IP rate limiting
838                        let _ = rate_limiter.check_ip(&remote_sock.ip());
839                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
840                        register_new_peer(&peers, &peer_id, &remote_addr).await;
841                    }
842                    Err(e) => {
843                        warn!("Accept failed: {}", e);
844                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
845                    }
846                }
847            }
848        });
849
850        info!("Dual-stack listeners active on: {:?}", addrs);
851        Ok(())
852    }
853
854    /// Start a listener on a specific socket address
855    #[allow(dead_code)]
856    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
857        // use crate::transport::{Transport}; // Unused during migration
858
859        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
860        /*
861        // Try QUIC first (preferred transport)
862        match crate::transport::QuicTransport::new(Default::default()) {
863            Ok(quic_transport) => {
864                match quic_transport.listen(NetworkAddress::new(addr)).await {
865                    Ok(listen_addrs) => {
866                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
867
868                        // Store the actual listening addresses in the node
869                        {
870                            let mut node_listen_addrs = self.listen_addrs.write().await;
871                            // Don't clear - accumulate addresses from multiple listeners
872                            node_listen_addrs.push(listen_addrs.socket_addr());
873                        }
874
875                        // Start accepting connections in background
876                        self.start_connection_acceptor(
877                            Arc::new(quic_transport),
878                            addr,
879                            crate::transport::TransportType::QUIC
880                        ).await?;
881
882                        return Ok(());
883                    }
884                    Err(e) => {
885                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
886                    }
887                }
888            }
889            Err(e) => {
890                warn!("Failed to create QUIC transport for listening: {}", e);
891            }
892        }
893        */
894
895        warn!("QUIC transport temporarily disabled during ant-quic migration");
896        // No TCP fallback - QUIC only
897        Err(crate::P2PError::Transport(
898            crate::error::TransportError::SetupFailed(
899                format!(
900                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
901                )
902                .into(),
903            ),
904        ))
905    }
906
907    /// Start connection acceptor background task
908    #[allow(dead_code)] // Deprecated during ant-quic migration
909    async fn start_connection_acceptor(
910        &self,
911        transport: Arc<dyn crate::transport::Transport>,
912        addr: std::net::SocketAddr,
913        transport_type: crate::transport::TransportType,
914    ) -> Result<()> {
915        info!(
916            "Starting connection acceptor for {:?} on {}",
917            transport_type, addr
918        );
919
920        // Clone necessary data for the background task
921        let event_tx = self.event_tx.clone();
922        let _peer_id = self.peer_id.clone();
923        let peers = Arc::clone(&self.peers);
924        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
925
926        let rate_limiter = Arc::clone(&self.rate_limiter);
927
928        // Spawn background task to accept incoming connections
929        tokio::spawn(async move {
930            loop {
931                match transport.accept().await {
932                    Ok(connection) => {
933                        let remote_addr = connection.remote_addr();
934                        let connection_peer_id =
935                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
936
937                        // Apply rate limiting for incoming connections
938                        let socket_addr = remote_addr.socket_addr();
939                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
940                            // Connection dropped automatically when it goes out of scope
941                            continue;
942                        }
943
944                        info!(
945                            "Accepted {:?} connection from {} (peer: {})",
946                            transport_type, remote_addr, connection_peer_id
947                        );
948
949                        // Generate peer connected event
950                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
951
952                        // Store the peer connection
953                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
954
955                        // Spawn task to handle this specific connection's messages
956                        spawn_connection_handler(
957                            connection,
958                            connection_peer_id,
959                            event_tx.clone(),
960                            Arc::clone(&peers),
961                        );
962                    }
963                    Err(e) => {
964                        warn!(
965                            "Failed to accept {:?} connection on {}: {}",
966                            transport_type, addr, e
967                        );
968
969                        // Brief pause before retrying to avoid busy loop
970                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
971                    }
972                }
973            }
974        });
975
976        info!(
977            "Connection acceptor background task started for {:?} on {}",
978            transport_type, addr
979        );
980        Ok(())
981    }
982
983    /// Start the message receiving system with background tasks
984    async fn start_message_receiving_system(&self) -> Result<()> {
985        info!("Starting message receiving system");
986        let dual = self.dual_node.clone();
987        let event_tx = self.event_tx.clone();
988
989        tokio::spawn(async move {
990            loop {
991                match dual.receive_any().await {
992                    Ok((_peer_id, bytes)) => {
993                        // Expect the JSON message wrapper from create_protocol_message
994                        #[allow(clippy::collapsible_if)]
995                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
996                            if let (Some(protocol), Some(data), Some(from)) = (
997                                value.get("protocol").and_then(|v| v.as_str()),
998                                value.get("data").and_then(|v| v.as_array()),
999                                value.get("from").and_then(|v| v.as_str()),
1000                            ) {
1001                                let payload: Vec<u8> = data
1002                                    .iter()
1003                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1004                                    .collect();
1005                                let _ = event_tx.send(P2PEvent::Message {
1006                                    topic: protocol.to_string(),
1007                                    source: from.to_string(),
1008                                    data: payload,
1009                                });
1010                            }
1011                        }
1012                    }
1013                    Err(e) => {
1014                        warn!("Receive error: {}", e);
1015                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1016                    }
1017                }
1018            }
1019        });
1020
1021        Ok(())
1022    }
1023
1024    /// Handle a received message and generate appropriate events
1025    #[allow(dead_code)]
1026    async fn handle_received_message(
1027        &self,
1028        message_data: Vec<u8>,
1029        peer_id: &PeerId,
1030        _protocol: &str,
1031        event_tx: &broadcast::Sender<P2PEvent>,
1032    ) -> Result<()> {
1033        // MCP removed: no special protocol handling
1034
1035        // Parse the message format we created in create_protocol_message
1036        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1037            Ok(message) => {
1038                if let (Some(protocol), Some(data), Some(from)) = (
1039                    message.get("protocol").and_then(|v| v.as_str()),
1040                    message.get("data").and_then(|v| v.as_array()),
1041                    message.get("from").and_then(|v| v.as_str()),
1042                ) {
1043                    // Convert data array back to bytes
1044                    let data_bytes: Vec<u8> = data
1045                        .iter()
1046                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1047                        .collect();
1048
1049                    // Generate message event
1050                    let event = P2PEvent::Message {
1051                        topic: protocol.to_string(),
1052                        source: from.to_string(),
1053                        data: data_bytes,
1054                    };
1055
1056                    let _ = event_tx.send(event);
1057                    debug!("Generated message event from peer: {}", peer_id);
1058                }
1059            }
1060            Err(e) => {
1061                warn!("Failed to parse received message from {}: {}", peer_id, e);
1062            }
1063        }
1064
1065        Ok(())
1066    }
1067
1068    // MCP removed
1069
1070    // MCP removed
1071
1072    /// Run the P2P node (blocks until shutdown)
1073    pub async fn run(&self) -> Result<()> {
1074        if !*self.running.read().await {
1075            self.start().await?;
1076        }
1077
1078        info!("P2P node running...");
1079
1080        // Main event loop
1081        loop {
1082            if !*self.running.read().await {
1083                break;
1084            }
1085
1086            // Perform periodic tasks
1087            self.periodic_tasks().await?;
1088
1089            // Sleep for a short interval
1090            tokio::time::sleep(Duration::from_millis(100)).await;
1091        }
1092
1093        info!("P2P node stopped");
1094        Ok(())
1095    }
1096
1097    /// Stop the P2P node
1098    pub async fn stop(&self) -> Result<()> {
1099        info!("Stopping P2P node...");
1100
1101        // Set running state to false
1102        *self.running.write().await = false;
1103
1104        // Disconnect all peers
1105        self.disconnect_all_peers().await?;
1106
1107        // Shutdown production resource manager if configured
1108        if let Some(ref resource_manager) = self.resource_manager {
1109            resource_manager.shutdown().await.map_err(|e| {
1110                P2PError::Network(crate::error::NetworkError::ProtocolError(
1111                    format!("Failed to shutdown resource manager: {e}").into(),
1112                ))
1113            })?;
1114            info!("Production resource manager stopped");
1115        }
1116
1117        info!("P2P node stopped");
1118        Ok(())
1119    }
1120
1121    /// Graceful shutdown alias for tests
1122    pub async fn shutdown(&self) -> Result<()> {
1123        self.stop().await
1124    }
1125
1126    /// Check if the node is running
1127    pub async fn is_running(&self) -> bool {
1128        *self.running.read().await
1129    }
1130
1131    /// Get the current listen addresses
1132    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1133        self.listen_addrs.read().await.clone()
1134    }
1135
1136    /// Get connected peers
1137    pub async fn connected_peers(&self) -> Vec<PeerId> {
1138        self.peers.read().await.keys().cloned().collect()
1139    }
1140
1141    /// Get peer count
1142    pub async fn peer_count(&self) -> usize {
1143        self.peers.read().await.len()
1144    }
1145
1146    /// Get peer info
1147    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1148        self.peers.read().await.get(peer_id).cloned()
1149    }
1150
1151    /// Connect to a peer
1152    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1153        info!("Connecting to peer at: {}", address);
1154
1155        // Check production limits if resource manager is enabled
1156        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1157            Some(resource_manager.acquire_connection().await?)
1158        } else {
1159            None
1160        };
1161
1162        // Parse the address to SocketAddr format
1163        let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1164            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1165                format!("{}: {}", address, e).into(),
1166            ))
1167        })?;
1168
1169        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1170        let addr_list = vec![_socket_addr];
1171        let peer_id = match tokio::time::timeout(
1172            self.config.connection_timeout,
1173            self.dual_node.connect_happy_eyeballs(&addr_list),
1174        )
1175        .await
1176        {
1177            Ok(Ok(peer)) => {
1178                let connected_peer_id =
1179                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1180                info!("Successfully connected to peer: {}", connected_peer_id);
1181                connected_peer_id
1182            }
1183            Ok(Err(e)) => {
1184                warn!("Failed to connect to peer at {}: {}", address, e);
1185                let demo_peer_id =
1186                    format!("peer_from_{}", address.replace('/', "_").replace(':', "_"));
1187                warn!(
1188                    "Using demo peer ID: {} (transport connection failed)",
1189                    demo_peer_id
1190                );
1191                demo_peer_id
1192            }
1193            Err(_) => {
1194                warn!(
1195                    "Timed out connecting to peer at {} after {:?}",
1196                    address, self.config.connection_timeout
1197                );
1198                let demo_peer_id =
1199                    format!("peer_from_{}", address.replace('/', "_").replace(':', "_"));
1200                demo_peer_id
1201            }
1202        };
1203
1204        // Create peer info with connection details
1205        let peer_info = PeerInfo {
1206            peer_id: peer_id.clone(),
1207            addresses: vec![address.to_string()],
1208            connected_at: Instant::now(),
1209            last_seen: Instant::now(),
1210            status: ConnectionStatus::Connected,
1211            protocols: vec!["p2p-foundation/1.0".to_string()],
1212            heartbeat_count: 0,
1213        };
1214
1215        // Store peer information
1216        self.peers.write().await.insert(peer_id.clone(), peer_info);
1217
1218        // Record bandwidth usage if resource manager is enabled
1219        if let Some(ref resource_manager) = self.resource_manager {
1220            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1221        }
1222
1223        // Emit connection event
1224        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1225
1226        info!("Connected to peer: {}", peer_id);
1227        Ok(peer_id)
1228    }
1229
1230    /// Disconnect from a peer
1231    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1232        info!("Disconnecting from peer: {}", peer_id);
1233
1234        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1235            peer_info.status = ConnectionStatus::Disconnected;
1236
1237            // Emit event
1238            let _ = self
1239                .event_tx
1240                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1241
1242            info!("Disconnected from peer: {}", peer_id);
1243        }
1244
1245        Ok(())
1246    }
1247
1248    /// Send a message to a peer
1249    pub async fn send_message(
1250        &self,
1251        peer_id: &PeerId,
1252        protocol: &str,
1253        data: Vec<u8>,
1254    ) -> Result<()> {
1255        debug!(
1256            "Sending message to peer {} on protocol {}",
1257            peer_id, protocol
1258        );
1259
1260        // Check rate limits if resource manager is enabled
1261        if let Some(ref resource_manager) = self.resource_manager
1262            && !resource_manager
1263                .check_rate_limit(peer_id, "message")
1264                .await?
1265        {
1266            return Err(P2PError::ResourceExhausted(
1267                format!("Rate limit exceeded for peer {}", peer_id).into(),
1268            ));
1269        }
1270
1271        // Check if peer is connected
1272        if !self.peers.read().await.contains_key(peer_id) {
1273            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1274                peer_id.to_string().into(),
1275            )));
1276        }
1277
1278        // MCP removed: no special-case protocol validation
1279
1280        // Record bandwidth usage if resource manager is enabled
1281        if let Some(ref resource_manager) = self.resource_manager {
1282            resource_manager.record_bandwidth(data.len() as u64, 0);
1283        }
1284
1285        // Create protocol message wrapper
1286        let _message_data = self.create_protocol_message(protocol, data)?;
1287
1288        // Send via ant-quic dual-node
1289        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1290        tokio::time::timeout(self.config.connection_timeout, send_fut)
1291            .await
1292            .map_err(|_| {
1293                P2PError::Transport(crate::error::TransportError::StreamError(
1294                    "Timed out sending message".into(),
1295                ))
1296            })?
1297            .map_err(|e| {
1298                P2PError::Transport(crate::error::TransportError::StreamError(
1299                    e.to_string().into(),
1300                ))
1301            })
1302    }
1303
1304    /// Create a protocol message wrapper
1305    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1306        use serde_json::json;
1307
1308        let timestamp = std::time::SystemTime::now()
1309            .duration_since(std::time::UNIX_EPOCH)
1310            .map_err(|e| {
1311                P2PError::Network(NetworkError::ProtocolError(
1312                    format!("System time error: {}", e).into(),
1313                ))
1314            })?
1315            .as_secs();
1316
1317        // Create a simple message format for P2P communication
1318        let message = json!({
1319            "protocol": protocol,
1320            "data": data,
1321            "from": self.peer_id,
1322            "timestamp": timestamp
1323        });
1324
1325        serde_json::to_vec(&message).map_err(|e| {
1326            P2PError::Transport(crate::error::TransportError::StreamError(
1327                format!("Failed to serialize message: {e}").into(),
1328            ))
1329        })
1330    }
1331
1332    // Note: async listen_addrs() already exists above for fetching listen addresses
1333}
1334
1335/// Create a protocol message wrapper (static version for background tasks)
1336#[allow(dead_code)]
1337fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1338    use serde_json::json;
1339
1340    let timestamp = std::time::SystemTime::now()
1341        .duration_since(std::time::UNIX_EPOCH)
1342        .map_err(|e| {
1343            P2PError::Network(NetworkError::ProtocolError(
1344                format!("System time error: {}", e).into(),
1345            ))
1346        })?
1347        .as_secs();
1348
1349    // Create a simple message format for P2P communication
1350    let message = json!({
1351        "protocol": protocol,
1352        "data": data,
1353        "timestamp": timestamp
1354    });
1355
1356    serde_json::to_vec(&message).map_err(|e| {
1357        P2PError::Transport(crate::error::TransportError::StreamError(
1358            format!("Failed to serialize message: {e}").into(),
1359        ))
1360    })
1361}
1362
1363impl P2PNode {
1364    /// Subscribe to network events
1365    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1366        self.event_tx.subscribe()
1367    }
1368
1369    /// Backwards-compat event stream accessor for tests
1370    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1371        self.subscribe_events()
1372    }
1373
1374    /// Get node uptime
1375    pub fn uptime(&self) -> Duration {
1376        self.start_time.elapsed()
1377    }
1378
1379    // MCP removed: all MCP tool/service methods removed
1380
1381    // /// Handle MCP remote tool call with network integration
1382
1383    // /// List tools available on a specific remote peer
1384
1385    // /// Get MCP server statistics
1386
1387    /// Get production resource metrics
1388    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1389        if let Some(ref resource_manager) = self.resource_manager {
1390            Ok(resource_manager.get_metrics().await)
1391        } else {
1392            Err(P2PError::Network(
1393                crate::error::NetworkError::ProtocolError(
1394                    "Production resource manager not enabled".to_string().into(),
1395                ),
1396            ))
1397        }
1398    }
1399
1400    /// Check system health
1401    pub async fn health_check(&self) -> Result<()> {
1402        if let Some(ref resource_manager) = self.resource_manager {
1403            resource_manager.health_check().await
1404        } else {
1405            // Basic health check without resource manager
1406            let peer_count = self.peer_count().await;
1407            if peer_count > self.config.max_connections {
1408                Err(P2PError::Network(
1409                    crate::error::NetworkError::ProtocolError(
1410                        format!("Too many connections: {peer_count}").into(),
1411                    ),
1412                ))
1413            } else {
1414                Ok(())
1415            }
1416        }
1417    }
1418
1419    /// Get production configuration (if enabled)
1420    pub fn production_config(&self) -> Option<&ProductionConfig> {
1421        self.config.production_config.as_ref()
1422    }
1423
1424    /// Check if production hardening is enabled
1425    pub fn is_production_mode(&self) -> bool {
1426        self.resource_manager.is_some()
1427    }
1428
1429    /// Get DHT reference
1430    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1431        self.dht.as_ref()
1432    }
1433
1434    /// Store a value in the DHT
1435    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1436        if let Some(ref dht) = self.dht {
1437            let mut dht_instance = dht.write().await;
1438            let dht_key = crate::dht::DhtKey::from_bytes(key);
1439            dht_instance
1440                .store(&dht_key, value.clone())
1441                .await
1442                .map_err(|e| {
1443                    P2PError::Dht(crate::error::DhtError::StoreFailed(
1444                        format!("{:?}: {e}", key).into(),
1445                    ))
1446                })?;
1447
1448            Ok(())
1449        } else {
1450            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1451                "DHT not enabled".to_string().into(),
1452            )))
1453        }
1454    }
1455
1456    /// Retrieve a value from the DHT
1457    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1458        if let Some(ref dht) = self.dht {
1459            let dht_instance = dht.read().await;
1460            let dht_key = crate::dht::DhtKey::from_bytes(key);
1461            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1462                P2PError::Dht(crate::error::DhtError::StoreFailed(
1463                    format!("Retrieve failed: {e}").into(),
1464                ))
1465            })?;
1466
1467            Ok(record_result)
1468        } else {
1469            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1470                "DHT not enabled".to_string().into(),
1471            )))
1472        }
1473    }
1474
1475    /// Add a discovered peer to the bootstrap cache
1476    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1477        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1478            let mut manager = bootstrap_manager.write().await;
1479            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1480                .iter()
1481                .filter_map(|addr| addr.parse().ok())
1482                .collect();
1483            let contact = ContactEntry::new(peer_id, socket_addresses);
1484            manager.add_contact(contact).await.map_err(|e| {
1485                P2PError::Network(crate::error::NetworkError::ProtocolError(
1486                    format!("Failed to add peer to bootstrap cache: {e}").into(),
1487                ))
1488            })?;
1489        }
1490        Ok(())
1491    }
1492
1493    /// Update connection metrics for a peer in the bootstrap cache
1494    pub async fn update_peer_metrics(
1495        &self,
1496        peer_id: &PeerId,
1497        success: bool,
1498        latency_ms: Option<u64>,
1499        _error: Option<String>,
1500    ) -> Result<()> {
1501        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1502            let mut manager = bootstrap_manager.write().await;
1503
1504            // Create quality metrics based on the connection result
1505            let metrics = QualityMetrics {
1506                success_rate: if success { 1.0 } else { 0.0 },
1507                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1508                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
1509                last_connection_attempt: chrono::Utc::now(),
1510                last_successful_connection: if success {
1511                    chrono::Utc::now()
1512                } else {
1513                    chrono::Utc::now() - chrono::Duration::hours(1)
1514                },
1515                uptime_score: 0.5,
1516            };
1517
1518            manager
1519                .update_contact_metrics(peer_id, metrics)
1520                .await
1521                .map_err(|e| {
1522                    P2PError::Network(crate::error::NetworkError::ProtocolError(
1523                        format!("Failed to update peer metrics: {e}").into(),
1524                    ))
1525                })?;
1526        }
1527        Ok(())
1528    }
1529
1530    /// Get bootstrap cache statistics
1531    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1532        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1533            let manager = bootstrap_manager.read().await;
1534            let stats = manager.get_stats().await.map_err(|e| {
1535                P2PError::Network(crate::error::NetworkError::ProtocolError(
1536                    format!("Failed to get bootstrap stats: {e}").into(),
1537                ))
1538            })?;
1539            Ok(Some(stats))
1540        } else {
1541            Ok(None)
1542        }
1543    }
1544
1545    /// Get the number of cached bootstrap peers
1546    pub async fn cached_peer_count(&self) -> usize {
1547        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1548            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1549        {
1550            return stats.total_contacts;
1551        }
1552        0
1553    }
1554
1555    /// Connect to bootstrap peers
1556    async fn connect_bootstrap_peers(&self) -> Result<()> {
1557        let mut bootstrap_contacts = Vec::new();
1558        let mut used_cache = false;
1559
1560        // Try to get peers from bootstrap cache first
1561        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1562            let manager = bootstrap_manager.read().await;
1563            match manager.get_bootstrap_peers(20).await {
1564                // Try to get top 20 quality peers
1565                Ok(contacts) => {
1566                    if !contacts.is_empty() {
1567                        info!("Using {} cached bootstrap peers", contacts.len());
1568                        bootstrap_contacts = contacts;
1569                        used_cache = true;
1570                    }
1571                }
1572                Err(e) => {
1573                    warn!("Failed to get cached bootstrap peers: {}", e);
1574                }
1575            }
1576        }
1577
1578        // Fallback to configured bootstrap peers if no cache or cache is empty
1579        if bootstrap_contacts.is_empty() {
1580            let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1581                &self.config.bootstrap_peers_str
1582            } else {
1583                // Convert Multiaddr to strings for fallback
1584                &self
1585                    .config
1586                    .bootstrap_peers
1587                    .iter()
1588                    .map(|addr| addr.to_string())
1589                    .collect::<Vec<_>>()
1590            };
1591
1592            if bootstrap_peers.is_empty() {
1593                info!("No bootstrap peers configured and no cached peers available");
1594                return Ok(());
1595            }
1596
1597            info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1598
1599            for addr in bootstrap_peers {
1600                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1601                    let contact = ContactEntry::new(
1602                        format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1603                        vec![socket_addr],
1604                    );
1605                    bootstrap_contacts.push(contact);
1606                } else {
1607                    warn!("Invalid bootstrap address format: {}", addr);
1608                }
1609            }
1610        }
1611
1612        // Connect to bootstrap peers
1613        let mut successful_connections = 0;
1614        for contact in bootstrap_contacts {
1615            for addr in &contact.addresses {
1616                match self.connect_peer(&addr.to_string()).await {
1617                    Ok(peer_id) => {
1618                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1619                        successful_connections += 1;
1620
1621                        // Update bootstrap cache with successful connection
1622                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1623                            let mut manager = bootstrap_manager.write().await;
1624                            let mut updated_contact = contact.clone();
1625                            updated_contact.peer_id = peer_id.clone();
1626                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
1627
1628                            if let Err(e) = manager.add_contact(updated_contact).await {
1629                                warn!("Failed to update bootstrap cache: {}", e);
1630                            }
1631                        }
1632                        break; // Successfully connected, move to next contact
1633                    }
1634                    Err(e) => {
1635                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1636
1637                        // Update bootstrap cache with failed connection
1638                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1639                            let mut manager = bootstrap_manager.write().await;
1640                            let mut updated_contact = contact.clone();
1641                            updated_contact.update_connection_result(
1642                                false,
1643                                None,
1644                                Some(e.to_string()),
1645                            );
1646
1647                            if let Err(e) = manager.add_contact(updated_contact).await {
1648                                warn!("Failed to update bootstrap cache: {}", e);
1649                            }
1650                        }
1651                    }
1652                }
1653            }
1654        }
1655
1656        if successful_connections == 0 {
1657            if !used_cache {
1658                warn!("Failed to connect to any bootstrap peers");
1659            }
1660            return Err(P2PError::Network(NetworkError::ConnectionFailed {
1661                addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), // Placeholder for bootstrap ensemble
1662                reason: "Failed to connect to any bootstrap peers".into(),
1663            }));
1664        }
1665        info!(
1666            "Successfully connected to {} bootstrap peers",
1667            successful_connections
1668        );
1669
1670        Ok(())
1671    }
1672
1673    /// Disconnect from all peers
1674    async fn disconnect_all_peers(&self) -> Result<()> {
1675        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
1676
1677        for peer_id in peer_ids {
1678            self.disconnect_peer(&peer_id).await?;
1679        }
1680
1681        Ok(())
1682    }
1683
1684    /// Perform periodic maintenance tasks
1685    async fn periodic_tasks(&self) -> Result<()> {
1686        // Update peer last seen timestamps
1687        // Remove stale connections
1688        // Perform DHT maintenance
1689        // This is a placeholder for now
1690
1691        Ok(())
1692    }
1693}
1694
1695/// Network sender trait for sending messages
1696#[async_trait::async_trait]
1697pub trait NetworkSender: Send + Sync {
1698    /// Send a message to a specific peer
1699    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1700
1701    /// Get our local peer ID
1702    fn local_peer_id(&self) -> &PeerId;
1703}
1704
1705/// Lightweight wrapper for P2PNode to implement NetworkSender
1706#[derive(Clone)]
1707pub struct P2PNetworkSender {
1708    peer_id: PeerId,
1709    // Use channels for async communication with the P2P node
1710    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1711}
1712
1713impl P2PNetworkSender {
1714    pub fn new(
1715        peer_id: PeerId,
1716        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1717    ) -> Self {
1718        Self { peer_id, send_tx }
1719    }
1720}
1721
1722/// Implementation of NetworkSender trait for P2PNetworkSender
1723#[async_trait::async_trait]
1724impl NetworkSender for P2PNetworkSender {
1725    /// Send a message to a specific peer via the P2P network
1726    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
1727        self.send_tx
1728            .send((peer_id.clone(), protocol.to_string(), data))
1729            .map_err(|_| {
1730                P2PError::Network(crate::error::NetworkError::ProtocolError(
1731                    "Failed to send message via channel".to_string().into(),
1732                ))
1733            })?;
1734        Ok(())
1735    }
1736
1737    /// Get our local peer ID
1738    fn local_peer_id(&self) -> &PeerId {
1739        &self.peer_id
1740    }
1741}
1742
1743/// Builder pattern for creating P2P nodes
1744pub struct NodeBuilder {
1745    config: NodeConfig,
1746}
1747
1748impl Default for NodeBuilder {
1749    fn default() -> Self {
1750        Self::new()
1751    }
1752}
1753
1754impl NodeBuilder {
1755    /// Create a new node builder
1756    pub fn new() -> Self {
1757        Self {
1758            config: NodeConfig::default(),
1759        }
1760    }
1761
1762    /// Set the peer ID
1763    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1764        self.config.peer_id = Some(peer_id);
1765        self
1766    }
1767
1768    /// Add a listen address
1769    pub fn listen_on(mut self, addr: &str) -> Self {
1770        if let Ok(multiaddr) = addr.parse() {
1771            self.config.listen_addrs.push(multiaddr);
1772        }
1773        self
1774    }
1775
1776    /// Add a bootstrap peer
1777    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1778        if let Ok(multiaddr) = addr.parse() {
1779            self.config.bootstrap_peers.push(multiaddr);
1780        }
1781        self.config.bootstrap_peers_str.push(addr.to_string());
1782        self
1783    }
1784
1785    /// Enable IPv6 support
1786    pub fn with_ipv6(mut self, enable: bool) -> Self {
1787        self.config.enable_ipv6 = enable;
1788        self
1789    }
1790
1791    // MCP removed: builder methods deleted
1792
1793    /// Set connection timeout
1794    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1795        self.config.connection_timeout = timeout;
1796        self
1797    }
1798
1799    /// Set maximum connections
1800    pub fn with_max_connections(mut self, max: usize) -> Self {
1801        self.config.max_connections = max;
1802        self
1803    }
1804
1805    /// Enable production mode with default configuration
1806    pub fn with_production_mode(mut self) -> Self {
1807        self.config.production_config = Some(ProductionConfig::default());
1808        self
1809    }
1810
1811    /// Configure production settings
1812    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1813        self.config.production_config = Some(production_config);
1814        self
1815    }
1816
1817    /// Configure DHT settings
1818    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
1819        self.config.dht_config = dht_config;
1820        self
1821    }
1822
1823    /// Enable DHT with default configuration
1824    pub fn with_default_dht(mut self) -> Self {
1825        self.config.dht_config = DHTConfig::default();
1826        self
1827    }
1828
1829    /// Build the P2P node
1830    pub async fn build(self) -> Result<P2PNode> {
1831        P2PNode::new(self.config).await
1832    }
1833}
1834
1835/// Standalone function to handle received messages without borrowing self
1836#[allow(dead_code)] // Deprecated during ant-quic migration
1837async fn handle_received_message_standalone(
1838    message_data: Vec<u8>,
1839    peer_id: &PeerId,
1840    _protocol: &str,
1841    event_tx: &broadcast::Sender<P2PEvent>,
1842) -> Result<()> {
1843    // Parse the message format
1844    match serde_json::from_slice::<serde_json::Value>(&message_data) {
1845        Ok(message) => {
1846            if let (Some(protocol), Some(data), Some(from)) = (
1847                message.get("protocol").and_then(|v| v.as_str()),
1848                message.get("data").and_then(|v| v.as_array()),
1849                message.get("from").and_then(|v| v.as_str()),
1850            ) {
1851                // Convert data array back to bytes
1852                let data_bytes: Vec<u8> = data
1853                    .iter()
1854                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1855                    .collect();
1856
1857                // Generate message event
1858                let event = P2PEvent::Message {
1859                    topic: protocol.to_string(),
1860                    source: from.to_string(),
1861                    data: data_bytes,
1862                };
1863
1864                let _ = event_tx.send(event);
1865                debug!("Generated message event from peer: {}", peer_id);
1866            }
1867        }
1868        Err(e) => {
1869            warn!("Failed to parse received message from {}: {}", peer_id, e);
1870        }
1871    }
1872
1873    Ok(())
1874}
1875
1876// MCP removed: standalone MCP handler deleted
1877
1878/// Helper function to handle protocol message creation
1879#[allow(dead_code)]
1880fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
1881    match create_protocol_message_static(protocol, data) {
1882        Ok(msg) => Some(msg),
1883        Err(e) => {
1884            warn!("Failed to create protocol message: {}", e);
1885            None
1886        }
1887    }
1888}
1889
1890/// Helper function to handle message send result
1891#[allow(dead_code)]
1892async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
1893    match result {
1894        Ok(_) => {
1895            debug!("Message sent to peer {} via transport layer", peer_id);
1896        }
1897        Err(e) => {
1898            warn!("Failed to send message to peer {}: {}", peer_id, e);
1899        }
1900    }
1901}
1902
1903/// Helper function to check rate limit
1904#[allow(dead_code)] // Deprecated during ant-quic migration
1905fn check_rate_limit(
1906    rate_limiter: &RateLimiter,
1907    socket_addr: &std::net::SocketAddr,
1908    remote_addr: &NetworkAddress,
1909) -> Result<()> {
1910    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
1911        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
1912        e
1913    })
1914}
1915
1916/// Helper function to register a new peer
1917#[allow(dead_code)] // Deprecated during ant-quic migration
1918async fn register_new_peer(
1919    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1920    peer_id: &PeerId,
1921    remote_addr: &NetworkAddress,
1922) {
1923    let mut peers_guard = peers.write().await;
1924    let peer_info = PeerInfo {
1925        peer_id: peer_id.clone(),
1926        addresses: vec![remote_addr.to_string()],
1927        connected_at: tokio::time::Instant::now(),
1928        last_seen: tokio::time::Instant::now(),
1929        status: ConnectionStatus::Connected,
1930        protocols: vec!["p2p-chat/1.0.0".to_string()],
1931        heartbeat_count: 0,
1932    };
1933    peers_guard.insert(peer_id.clone(), peer_info);
1934}
1935
1936/// Helper function to spawn connection handler
1937#[allow(dead_code)] // Deprecated during ant-quic migration
1938fn spawn_connection_handler(
1939    connection: Box<dyn crate::transport::Connection>,
1940    peer_id: PeerId,
1941    event_tx: broadcast::Sender<P2PEvent>,
1942    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1943) {
1944    tokio::spawn(async move {
1945        handle_peer_connection(connection, peer_id, event_tx, peers).await;
1946    });
1947}
1948
1949/// Helper function to handle peer connection
1950#[allow(dead_code)] // Deprecated during ant-quic migration
1951async fn handle_peer_connection(
1952    mut connection: Box<dyn crate::transport::Connection>,
1953    peer_id: PeerId,
1954    event_tx: broadcast::Sender<P2PEvent>,
1955    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1956) {
1957    loop {
1958        match connection.receive().await {
1959            Ok(message_data) => {
1960                debug!(
1961                    "Received {} bytes from peer: {}",
1962                    message_data.len(),
1963                    peer_id
1964                );
1965
1966                // Handle the received message
1967                if let Err(e) = handle_received_message_standalone(
1968                    message_data,
1969                    &peer_id,
1970                    "unknown", // TODO: Extract protocol from message
1971                    &event_tx,
1972                )
1973                .await
1974                {
1975                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
1976                }
1977            }
1978            Err(e) => {
1979                warn!("Failed to receive message from {}: {}", peer_id, e);
1980
1981                // Check if connection is still alive
1982                if !connection.is_alive().await {
1983                    info!("Connection to {} is dead, removing peer", peer_id);
1984
1985                    // Remove dead peer
1986                    remove_peer(&peers, &peer_id).await;
1987
1988                    // Generate peer disconnected event
1989                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
1990
1991                    break; // Exit the message receiving loop
1992                }
1993
1994                // Brief pause before retrying
1995                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1996            }
1997        }
1998    }
1999}
2000
2001/// Helper function to remove a peer
2002#[allow(dead_code)] // Deprecated during ant-quic migration
2003async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2004    let mut peers_guard = peers.write().await;
2005    peers_guard.remove(peer_id);
2006}
2007
2008/// Helper function to update peer heartbeat
2009#[allow(dead_code)]
2010async fn update_peer_heartbeat(
2011    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2012    peer_id: &PeerId,
2013) -> Result<()> {
2014    let mut peers_guard = peers.write().await;
2015    match peers_guard.get_mut(peer_id) {
2016        Some(peer_info) => {
2017            peer_info.last_seen = Instant::now();
2018            peer_info.heartbeat_count += 1;
2019            Ok(())
2020        }
2021        None => {
2022            warn!("Received heartbeat from unknown peer: {}", peer_id);
2023            Err(P2PError::Network(NetworkError::PeerNotFound(
2024                format!("Peer {} not found", peer_id).into(),
2025            )))
2026        }
2027    }
2028}
2029
2030/// Helper function to get resource metrics
2031#[allow(dead_code)]
2032async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2033    if let Some(manager) = resource_manager {
2034        let metrics = manager.get_metrics().await;
2035        (metrics.memory_used, metrics.cpu_usage)
2036    } else {
2037        (0, 0.0)
2038    }
2039}
2040
2041#[cfg(test)]
2042mod tests {
2043    use super::*;
2044    // MCP removed from tests
2045    use std::time::Duration;
2046    use tokio::time::timeout;
2047
2048    // Test tool handler for network tests
2049
2050    // MCP removed
2051
2052    /// Helper function to create a test node configuration
2053    fn create_test_node_config() -> NodeConfig {
2054        NodeConfig {
2055            peer_id: Some("test_peer_123".to_string()),
2056            listen_addrs: vec![
2057                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2058                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2059            ],
2060            listen_addr: std::net::SocketAddr::new(
2061                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2062                0,
2063            ),
2064            bootstrap_peers: vec![],
2065            bootstrap_peers_str: vec![],
2066            enable_ipv6: true,
2067
2068            connection_timeout: Duration::from_millis(300),
2069            keep_alive_interval: Duration::from_secs(30),
2070            max_connections: 100,
2071            max_incoming_connections: 50,
2072            dht_config: DHTConfig::default(),
2073            security_config: SecurityConfig::default(),
2074            production_config: None,
2075            bootstrap_cache_config: None,
2076            // identity_config: None,
2077        }
2078    }
2079
2080    /// Helper function to create a test tool
2081    // MCP removed: test tool helper deleted
2082
2083    #[tokio::test]
2084    async fn test_node_config_default() {
2085        let config = NodeConfig::default();
2086
2087        assert!(config.peer_id.is_none());
2088        assert_eq!(config.listen_addrs.len(), 2);
2089        assert!(config.enable_ipv6);
2090        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2091        assert_eq!(config.max_incoming_connections, 100);
2092        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2093    }
2094
2095    #[tokio::test]
2096    async fn test_dht_config_default() {
2097        let config = DHTConfig::default();
2098
2099        assert_eq!(config.k_value, 20);
2100        assert_eq!(config.alpha_value, 5);
2101        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2102        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2103    }
2104
2105    #[tokio::test]
2106    async fn test_security_config_default() {
2107        let config = SecurityConfig::default();
2108
2109        assert!(config.enable_noise);
2110        assert!(config.enable_tls);
2111        assert_eq!(config.trust_level, TrustLevel::Basic);
2112    }
2113
2114    #[test]
2115    fn test_trust_level_variants() {
2116        // Test that all trust level variants can be created
2117        let _none = TrustLevel::None;
2118        let _basic = TrustLevel::Basic;
2119        let _full = TrustLevel::Full;
2120
2121        // Test equality
2122        assert_eq!(TrustLevel::None, TrustLevel::None);
2123        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2124        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2125        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2126    }
2127
2128    #[test]
2129    fn test_connection_status_variants() {
2130        let connecting = ConnectionStatus::Connecting;
2131        let connected = ConnectionStatus::Connected;
2132        let disconnecting = ConnectionStatus::Disconnecting;
2133        let disconnected = ConnectionStatus::Disconnected;
2134        let failed = ConnectionStatus::Failed("test error".to_string());
2135
2136        assert_eq!(connecting, ConnectionStatus::Connecting);
2137        assert_eq!(connected, ConnectionStatus::Connected);
2138        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2139        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2140        assert_ne!(connecting, connected);
2141
2142        if let ConnectionStatus::Failed(msg) = failed {
2143            assert_eq!(msg, "test error");
2144        } else {
2145            panic!("Expected Failed status");
2146        }
2147    }
2148
2149    #[tokio::test]
2150    async fn test_node_creation() -> Result<()> {
2151        let config = create_test_node_config();
2152        let node = P2PNode::new(config).await?;
2153
2154        assert_eq!(node.peer_id(), "test_peer_123");
2155        assert!(!node.is_running().await);
2156        assert_eq!(node.peer_count().await, 0);
2157        assert!(node.connected_peers().await.is_empty());
2158
2159        Ok(())
2160    }
2161
2162    #[tokio::test]
2163    async fn test_node_creation_without_peer_id() -> Result<()> {
2164        let mut config = create_test_node_config();
2165        config.peer_id = None;
2166
2167        let node = P2PNode::new(config).await?;
2168
2169        // Should have generated a peer ID
2170        assert!(node.peer_id().starts_with("peer_"));
2171        assert!(!node.is_running().await);
2172
2173        Ok(())
2174    }
2175
2176    #[tokio::test]
2177    async fn test_node_lifecycle() -> Result<()> {
2178        let config = create_test_node_config();
2179        let node = P2PNode::new(config).await?;
2180
2181        // Initially not running
2182        assert!(!node.is_running().await);
2183
2184        // Start the node
2185        node.start().await?;
2186        assert!(node.is_running().await);
2187
2188        // Check listen addresses were set (at least one)
2189        let listen_addrs = node.listen_addrs().await;
2190        assert!(
2191            !listen_addrs.is_empty(),
2192            "Expected at least one listening address"
2193        );
2194
2195        // Stop the node
2196        node.stop().await?;
2197        assert!(!node.is_running().await);
2198
2199        Ok(())
2200    }
2201
2202    #[tokio::test]
2203    async fn test_peer_connection() -> Result<()> {
2204        let config = create_test_node_config();
2205        let node = P2PNode::new(config).await?;
2206
2207        let peer_addr = "127.0.0.1:0";
2208
2209        // Connect to a peer
2210        let peer_id = node.connect_peer(peer_addr).await?;
2211        assert!(peer_id.starts_with("peer_from_"));
2212
2213        // Check peer count
2214        assert_eq!(node.peer_count().await, 1);
2215
2216        // Check connected peers
2217        let connected_peers = node.connected_peers().await;
2218        assert_eq!(connected_peers.len(), 1);
2219        assert_eq!(connected_peers[0], peer_id);
2220
2221        // Get peer info
2222        let peer_info = node.peer_info(&peer_id).await;
2223        assert!(peer_info.is_some());
2224        let info = peer_info.expect("Peer info should exist after adding peer");
2225        assert_eq!(info.peer_id, peer_id);
2226        assert_eq!(info.status, ConnectionStatus::Connected);
2227        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2228
2229        // Disconnect from peer
2230        node.disconnect_peer(&peer_id).await?;
2231        assert_eq!(node.peer_count().await, 0);
2232
2233        Ok(())
2234    }
2235
2236    #[tokio::test]
2237    async fn test_event_subscription() -> Result<()> {
2238        let config = create_test_node_config();
2239        let node = P2PNode::new(config).await?;
2240
2241        let mut events = node.subscribe_events();
2242        let peer_addr = "127.0.0.1:0";
2243
2244        // Connect to a peer (this should emit an event)
2245        let peer_id = node.connect_peer(peer_addr).await?;
2246
2247        // Check for PeerConnected event
2248        let event = timeout(Duration::from_millis(100), events.recv()).await;
2249        assert!(event.is_ok());
2250
2251        let event_result = event
2252            .expect("Should receive event")
2253            .expect("Event should not be error");
2254        match event_result {
2255            P2PEvent::PeerConnected(event_peer_id) => {
2256                assert_eq!(event_peer_id, peer_id);
2257            }
2258            _ => panic!("Expected PeerConnected event"),
2259        }
2260
2261        // Disconnect from peer (this should emit another event)
2262        node.disconnect_peer(&peer_id).await?;
2263
2264        // Check for PeerDisconnected event
2265        let event = timeout(Duration::from_millis(100), events.recv()).await;
2266        assert!(event.is_ok());
2267
2268        let event_result = event
2269            .expect("Should receive event")
2270            .expect("Event should not be error");
2271        match event_result {
2272            P2PEvent::PeerDisconnected(event_peer_id) => {
2273                assert_eq!(event_peer_id, peer_id);
2274            }
2275            _ => panic!("Expected PeerDisconnected event"),
2276        }
2277
2278        Ok(())
2279    }
2280
2281    #[tokio::test]
2282    async fn test_message_sending() -> Result<()> {
2283        // Create two nodes
2284        let mut config1 = create_test_node_config();
2285        config1.listen_addr =
2286            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2287        let node1 = P2PNode::new(config1).await?;
2288        node1.start().await?;
2289
2290        let mut config2 = create_test_node_config();
2291        config2.listen_addr =
2292            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2293        let node2 = P2PNode::new(config2).await?;
2294        node2.start().await?;
2295
2296        // Wait a bit for nodes to start listening
2297        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2298
2299        // Get actual listening address of node2
2300        let node2_addr = node2.local_addr().ok_or_else(|| {
2301            P2PError::Network(crate::error::NetworkError::ProtocolError(
2302                "No listening address".to_string().into(),
2303            ))
2304        })?;
2305
2306        // Connect node1 to node2
2307        let peer_id =
2308            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2309                Ok(res) => res?,
2310                Err(_) => return Err(P2PError::Network(NetworkError::Timeout).into()),
2311            };
2312
2313        // Wait a bit for connection to establish
2314        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2315
2316        // Send a message
2317        let message_data = b"Hello, peer!".to_vec();
2318        let result = match timeout(
2319            Duration::from_millis(500),
2320            node1.send_message(&peer_id, "test-protocol", message_data),
2321        )
2322        .await
2323        {
2324            Ok(res) => res,
2325            Err(_) => return Err(P2PError::Network(NetworkError::Timeout).into()),
2326        };
2327        // For now, we'll just check that we don't get a "not connected" error
2328        // The actual send might fail due to no handler on the other side
2329        if let Err(e) = &result {
2330            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2331        }
2332
2333        // Try to send to non-existent peer
2334        let non_existent_peer = "non_existent_peer".to_string();
2335        let result = node1
2336            .send_message(&non_existent_peer, "test-protocol", vec![])
2337            .await;
2338        assert!(result.is_err(), "Sending to non-existent peer should fail");
2339
2340        Ok(())
2341    }
2342
2343    #[tokio::test]
2344    async fn test_remote_mcp_operations() -> Result<()> {
2345        let config = create_test_node_config();
2346        let node = P2PNode::new(config).await?;
2347
2348        // MCP removed; test reduced to simple start/stop
2349        node.start().await?;
2350        node.stop().await?;
2351        Ok(())
2352    }
2353
2354    #[tokio::test]
2355    async fn test_health_check() -> Result<()> {
2356        let config = create_test_node_config();
2357        let node = P2PNode::new(config).await?;
2358
2359        // Health check should pass with no connections
2360        let result = node.health_check().await;
2361        assert!(result.is_ok());
2362
2363        // Note: We're not actually connecting to real peers here
2364        // since that would require running bootstrap nodes.
2365        // The health check should still pass with no connections.
2366
2367        Ok(())
2368    }
2369
2370    #[tokio::test]
2371    async fn test_node_uptime() -> Result<()> {
2372        let config = create_test_node_config();
2373        let node = P2PNode::new(config).await?;
2374
2375        let uptime1 = node.uptime();
2376        assert!(uptime1 >= Duration::from_secs(0));
2377
2378        // Wait a bit
2379        tokio::time::sleep(Duration::from_millis(10)).await;
2380
2381        let uptime2 = node.uptime();
2382        assert!(uptime2 > uptime1);
2383
2384        Ok(())
2385    }
2386
2387    #[tokio::test]
2388    async fn test_node_config_access() -> Result<()> {
2389        let config = create_test_node_config();
2390        let expected_peer_id = config.peer_id.clone();
2391        let node = P2PNode::new(config).await?;
2392
2393        let node_config = node.config();
2394        assert_eq!(node_config.peer_id, expected_peer_id);
2395        assert_eq!(node_config.max_connections, 100);
2396        // MCP removed
2397
2398        Ok(())
2399    }
2400
2401    #[tokio::test]
2402    async fn test_mcp_server_access() -> Result<()> {
2403        let config = create_test_node_config();
2404        let _node = P2PNode::new(config).await?;
2405
2406        // MCP removed
2407        Ok(())
2408    }
2409
2410    #[tokio::test]
2411    async fn test_dht_access() -> Result<()> {
2412        let config = create_test_node_config();
2413        let node = P2PNode::new(config).await?;
2414
2415        // Should have DHT
2416        assert!(node.dht().is_some());
2417
2418        Ok(())
2419    }
2420
2421    #[tokio::test]
2422    async fn test_node_builder() -> Result<()> {
2423        // Create a config using the builder but don't actually build a real node
2424        let builder = P2PNode::builder()
2425            .with_peer_id("builder_test_peer".to_string())
2426            .listen_on("/ip4/127.0.0.1/tcp/0")
2427            .listen_on("/ip6/::1/tcp/0")
2428            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
2429            .with_ipv6(true)
2430            .with_connection_timeout(Duration::from_secs(15))
2431            .with_max_connections(200);
2432
2433        // Test the configuration that was built
2434        let config = builder.config;
2435        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2436        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
2437        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
2438        assert!(config.enable_ipv6);
2439        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2440        assert_eq!(config.max_connections, 200);
2441
2442        Ok(())
2443    }
2444
2445    #[tokio::test]
2446    async fn test_bootstrap_peers() -> Result<()> {
2447        let mut config = create_test_node_config();
2448        config.bootstrap_peers = vec![
2449            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2450            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2451        ];
2452
2453        let node = P2PNode::new(config).await?;
2454
2455        // Start node (which attempts to connect to bootstrap peers)
2456        node.start().await?;
2457
2458        // In a test environment, bootstrap peers may not be available
2459        // The test verifies the node starts correctly with bootstrap configuration
2460        let peer_count = node.peer_count().await;
2461        assert!(
2462            peer_count <= 2,
2463            "Peer count should not exceed bootstrap peer count"
2464        );
2465
2466        node.stop().await?;
2467        Ok(())
2468    }
2469
2470    #[tokio::test]
2471    async fn test_production_mode_disabled() -> Result<()> {
2472        let config = create_test_node_config();
2473        let node = P2PNode::new(config).await?;
2474
2475        assert!(!node.is_production_mode());
2476        assert!(node.production_config().is_none());
2477
2478        // Resource metrics should fail when production mode is disabled
2479        let result = node.resource_metrics().await;
2480        assert!(result.is_err());
2481        assert!(result.unwrap_err().to_string().contains("not enabled"));
2482
2483        Ok(())
2484    }
2485
2486    #[tokio::test]
2487    async fn test_network_event_variants() {
2488        // Test that all network event variants can be created
2489        let peer_id = "test_peer".to_string();
2490        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2491
2492        let _peer_connected = NetworkEvent::PeerConnected {
2493            peer_id: peer_id.clone(),
2494            addresses: vec![address.clone()],
2495        };
2496
2497        let _peer_disconnected = NetworkEvent::PeerDisconnected {
2498            peer_id: peer_id.clone(),
2499            reason: "test disconnect".to_string(),
2500        };
2501
2502        let _message_received = NetworkEvent::MessageReceived {
2503            peer_id: peer_id.clone(),
2504            protocol: "test-protocol".to_string(),
2505            data: vec![1, 2, 3],
2506        };
2507
2508        let _connection_failed = NetworkEvent::ConnectionFailed {
2509            peer_id: Some(peer_id.clone()),
2510            address: address.clone(),
2511            error: "connection refused".to_string(),
2512        };
2513
2514        let _dht_stored = NetworkEvent::DHTRecordStored {
2515            key: vec![1, 2, 3],
2516            value: vec![4, 5, 6],
2517        };
2518
2519        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2520            key: vec![1, 2, 3],
2521            value: Some(vec![4, 5, 6]),
2522        };
2523    }
2524
2525    #[tokio::test]
2526    async fn test_peer_info_structure() {
2527        let peer_info = PeerInfo {
2528            peer_id: "test_peer".to_string(),
2529            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2530            connected_at: Instant::now(),
2531            last_seen: Instant::now(),
2532            status: ConnectionStatus::Connected,
2533            protocols: vec!["test-protocol".to_string()],
2534            heartbeat_count: 0,
2535        };
2536
2537        assert_eq!(peer_info.peer_id, "test_peer");
2538        assert_eq!(peer_info.addresses.len(), 1);
2539        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2540        assert_eq!(peer_info.protocols.len(), 1);
2541    }
2542
2543    #[tokio::test]
2544    async fn test_serialization() -> Result<()> {
2545        // Test that configs can be serialized/deserialized
2546        let config = create_test_node_config();
2547        let serialized = serde_json::to_string(&config)?;
2548        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2549
2550        assert_eq!(config.peer_id, deserialized.peer_id);
2551        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2552        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2553
2554        Ok(())
2555    }
2556}