ipfrs_network/
node.rs

1//! Network node implementation with full libp2p integration
2
3use dashmap::DashSet;
4use futures::StreamExt;
5use libp2p::{
6    autonat,
7    core::Transport as _,
8    dcutr, identify, identity, kad, mdns, noise, ping, relay,
9    swarm::{NetworkBehaviour, SwarmEvent},
10    Multiaddr, PeerId, Swarm,
11};
12use parking_lot::RwLock;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::mpsc;
18use tracing::{debug, info, warn};
19
20// Type alias for IPFRS results to avoid conflicts with libp2p types
21type IpfrsResult<T> = ipfrs_core::error::Result<T>;
22
23/// Kademlia DHT configuration
24#[derive(Debug, Clone)]
25pub struct KademliaConfig {
26    /// Query timeout in seconds
27    pub query_timeout_secs: u64,
28    /// Replication factor (k) - number of replicas to store
29    pub replication_factor: usize,
30    /// Alpha (α) - concurrency parameter for iterative queries
31    pub alpha: usize,
32    /// K-bucket size - maximum peers per bucket
33    pub kbucket_size: usize,
34}
35
36impl Default for KademliaConfig {
37    fn default() -> Self {
38        Self {
39            // Standard Kademlia timeout
40            query_timeout_secs: 60,
41            // IPFS uses 20 for replication
42            replication_factor: 20,
43            // Standard Kademlia alpha (3 is common, IPFS uses 3)
44            alpha: 3,
45            // Standard Kademlia k-bucket size (20 is standard)
46            kbucket_size: 20,
47        }
48    }
49}
50
51/// Network configuration
52#[derive(Debug, Clone)]
53pub struct NetworkConfig {
54    /// Listen addresses
55    pub listen_addrs: Vec<String>,
56    /// Bootstrap peers
57    pub bootstrap_peers: Vec<String>,
58    /// Enable QUIC transport
59    pub enable_quic: bool,
60    /// Data directory
61    pub data_dir: PathBuf,
62    /// Enable mDNS peer discovery
63    pub enable_mdns: bool,
64    /// Enable NAT traversal (AutoNAT + DCUtR)
65    pub enable_nat_traversal: bool,
66    /// Relay server addresses for NAT traversal
67    pub relay_servers: Vec<String>,
68    /// Kademlia DHT configuration
69    pub kademlia: KademliaConfig,
70    /// Maximum number of concurrent connections (None = unlimited)
71    pub max_connections: Option<usize>,
72    /// Maximum number of inbound connections (None = unlimited)
73    pub max_inbound_connections: Option<usize>,
74    /// Maximum number of outbound connections (None = unlimited)
75    pub max_outbound_connections: Option<usize>,
76    /// Connection buffer size in bytes
77    pub connection_buffer_size: usize,
78    /// Enable aggressive memory optimizations
79    pub low_memory_mode: bool,
80}
81
82impl Default for NetworkConfig {
83    fn default() -> Self {
84        Self {
85            listen_addrs: vec![
86                "/ip4/0.0.0.0/udp/0/quic-v1".to_string(),
87                "/ip6/::/udp/0/quic-v1".to_string(),
88            ],
89            bootstrap_peers: vec![],
90            enable_quic: true,
91            enable_mdns: false,
92            enable_nat_traversal: true,
93            relay_servers: vec![],
94            data_dir: PathBuf::from(".ipfrs"),
95            kademlia: KademliaConfig::default(),
96            max_connections: None,
97            max_inbound_connections: None,
98            max_outbound_connections: None,
99            connection_buffer_size: 64 * 1024, // 64 KB default
100            low_memory_mode: false,
101        }
102    }
103}
104
105impl NetworkConfig {
106    /// Create a low-memory configuration for constrained devices
107    ///
108    /// This configuration minimizes memory usage at the cost of some features:
109    /// - Limited to 16 total connections
110    /// - Smaller connection buffers (8 KB)
111    /// - Reduced DHT parameters
112    /// - mDNS disabled
113    /// - NAT traversal disabled
114    ///
115    /// Suitable for embedded devices with < 128 MB RAM
116    pub fn low_memory() -> Self {
117        Self {
118            listen_addrs: vec!["/ip4/0.0.0.0/udp/0/quic-v1".to_string()],
119            bootstrap_peers: vec![],
120            enable_quic: true,
121            enable_mdns: false,          // Disabled to save memory
122            enable_nat_traversal: false, // Disabled to save memory
123            relay_servers: vec![],
124            data_dir: PathBuf::from(".ipfrs"),
125            kademlia: KademliaConfig {
126                query_timeout_secs: 30, // Shorter timeout
127                replication_factor: 10, // Reduced from 20
128                alpha: 2,               // Reduced from 3
129                kbucket_size: 10,       // Reduced from 20
130            },
131            max_connections: Some(16), // Very limited connections
132            max_inbound_connections: Some(8),
133            max_outbound_connections: Some(8),
134            connection_buffer_size: 8 * 1024, // 8 KB buffers
135            low_memory_mode: true,
136        }
137    }
138
139    /// Create an IoT device configuration
140    ///
141    /// Balanced configuration for IoT devices:
142    /// - Limited to 32 total connections
143    /// - Moderate connection buffers (16 KB)
144    /// - Reduced DHT parameters
145    /// - mDNS enabled for local discovery
146    /// - NAT traversal enabled
147    ///
148    /// Suitable for IoT devices with 128-512 MB RAM
149    pub fn iot() -> Self {
150        Self {
151            listen_addrs: vec!["/ip4/0.0.0.0/udp/0/quic-v1".to_string()],
152            bootstrap_peers: vec![],
153            enable_quic: true,
154            enable_mdns: true, // Local discovery useful for IoT
155            enable_nat_traversal: true,
156            relay_servers: vec![],
157            data_dir: PathBuf::from(".ipfrs"),
158            kademlia: KademliaConfig {
159                query_timeout_secs: 45,
160                replication_factor: 15,
161                alpha: 2,
162                kbucket_size: 15,
163            },
164            max_connections: Some(32),
165            max_inbound_connections: Some(16),
166            max_outbound_connections: Some(16),
167            connection_buffer_size: 16 * 1024, // 16 KB buffers
168            low_memory_mode: false,
169        }
170    }
171
172    /// Create a mobile device configuration
173    ///
174    /// Power and bandwidth-aware configuration for mobile devices:
175    /// - Limited to 64 total connections
176    /// - Standard connection buffers (32 KB)
177    /// - Standard DHT parameters
178    /// - mDNS disabled (battery saving)
179    /// - NAT traversal enabled
180    ///
181    /// Suitable for mobile devices with network switching
182    pub fn mobile() -> Self {
183        Self {
184            listen_addrs: vec!["/ip4/0.0.0.0/udp/0/quic-v1".to_string()],
185            bootstrap_peers: vec![],
186            enable_quic: true,
187            enable_mdns: false, // Battery saving
188            enable_nat_traversal: true,
189            relay_servers: vec![],
190            data_dir: PathBuf::from(".ipfrs"),
191            kademlia: KademliaConfig {
192                query_timeout_secs: 60,
193                replication_factor: 20,
194                alpha: 3,
195                kbucket_size: 20,
196            },
197            max_connections: Some(64),
198            max_inbound_connections: Some(32),
199            max_outbound_connections: Some(32),
200            connection_buffer_size: 32 * 1024, // 32 KB buffers
201            low_memory_mode: false,
202        }
203    }
204
205    /// Create a high-performance configuration for servers
206    ///
207    /// Optimized for high throughput and many connections:
208    /// - Unlimited connections
209    /// - Large connection buffers (128 KB)
210    /// - Aggressive DHT parameters
211    /// - All features enabled
212    ///
213    /// Suitable for servers with > 2 GB RAM
214    pub fn high_performance() -> Self {
215        Self {
216            listen_addrs: vec![
217                "/ip4/0.0.0.0/udp/0/quic-v1".to_string(),
218                "/ip6/::/udp/0/quic-v1".to_string(),
219            ],
220            bootstrap_peers: vec![],
221            enable_quic: true,
222            enable_mdns: true,
223            enable_nat_traversal: true,
224            relay_servers: vec![],
225            data_dir: PathBuf::from(".ipfrs"),
226            kademlia: KademliaConfig {
227                query_timeout_secs: 60,
228                replication_factor: 20,
229                alpha: 3,
230                kbucket_size: 20,
231            },
232            max_connections: None, // Unlimited
233            max_inbound_connections: None,
234            max_outbound_connections: None,
235            connection_buffer_size: 128 * 1024, // 128 KB buffers
236            low_memory_mode: false,
237        }
238    }
239}
240
241/// Network behavior combining multiple protocols
242#[derive(NetworkBehaviour)]
243#[behaviour(to_swarm = "IpfrsBehaviourEvent")]
244pub struct IpfrsBehaviour {
245    /// Kademlia DHT for content and peer discovery
246    pub kademlia: kad::Behaviour<kad::store::MemoryStore>,
247    /// Identify protocol for peer information
248    pub identify: identify::Behaviour,
249    /// Ping protocol for connectivity checks
250    pub ping: ping::Behaviour,
251    /// AutoNAT for NAT detection and address confirmation
252    pub autonat: autonat::Behaviour,
253    /// DCUtR for hole punching through NAT
254    pub dcutr: dcutr::Behaviour,
255    /// mDNS for local network peer discovery
256    pub mdns: mdns::tokio::Behaviour,
257    /// Relay client for NAT traversal
258    pub relay_client: relay::client::Behaviour,
259}
260
261/// Events generated by IpfrsBehaviour
262#[derive(Debug)]
263pub enum IpfrsBehaviourEvent {
264    Kademlia(kad::Event),
265    Identify(identify::Event),
266    Ping(ping::Event),
267    Autonat(autonat::Event),
268    Dcutr(dcutr::Event),
269    Mdns(mdns::Event),
270    RelayClient(relay::client::Event),
271}
272
273impl From<kad::Event> for IpfrsBehaviourEvent {
274    fn from(event: kad::Event) -> Self {
275        IpfrsBehaviourEvent::Kademlia(event)
276    }
277}
278
279impl From<identify::Event> for IpfrsBehaviourEvent {
280    fn from(event: identify::Event) -> Self {
281        IpfrsBehaviourEvent::Identify(event)
282    }
283}
284
285impl From<ping::Event> for IpfrsBehaviourEvent {
286    fn from(event: ping::Event) -> Self {
287        IpfrsBehaviourEvent::Ping(event)
288    }
289}
290
291impl From<autonat::Event> for IpfrsBehaviourEvent {
292    fn from(event: autonat::Event) -> Self {
293        IpfrsBehaviourEvent::Autonat(event)
294    }
295}
296
297impl From<dcutr::Event> for IpfrsBehaviourEvent {
298    fn from(event: dcutr::Event) -> Self {
299        IpfrsBehaviourEvent::Dcutr(event)
300    }
301}
302
303impl From<mdns::Event> for IpfrsBehaviourEvent {
304    fn from(event: mdns::Event) -> Self {
305        IpfrsBehaviourEvent::Mdns(event)
306    }
307}
308
309impl From<relay::client::Event> for IpfrsBehaviourEvent {
310    fn from(event: relay::client::Event) -> Self {
311        IpfrsBehaviourEvent::RelayClient(event)
312    }
313}
314
315/// IPFRS network node
316pub struct NetworkNode {
317    config: NetworkConfig,
318    peer_id: PeerId,
319    swarm: Option<Swarm<IpfrsBehaviour>>,
320    shutdown_tx: Option<mpsc::Sender<()>>,
321    event_tx: mpsc::Sender<NetworkEvent>,
322    event_rx: Option<mpsc::Receiver<NetworkEvent>>,
323    /// External addresses discovered via AutoNAT
324    external_addrs: Arc<parking_lot::RwLock<Vec<Multiaddr>>>,
325    /// Set of currently connected peers
326    connected_peers: Arc<DashSet<PeerId>>,
327    /// Bandwidth tracking (bytes sent/received)
328    bandwidth_stats: Arc<parking_lot::RwLock<BandwidthStats>>,
329}
330
331/// Bandwidth statistics
332#[derive(Debug, Clone, Default)]
333struct BandwidthStats {
334    bytes_sent: u64,
335    bytes_received: u64,
336}
337
338/// Network events
339#[derive(Debug, Clone)]
340pub enum NetworkEvent {
341    /// Peer connected
342    PeerConnected {
343        peer_id: PeerId,
344        endpoint: ConnectionEndpoint,
345        established_in: std::time::Duration,
346    },
347    /// Peer disconnected
348    PeerDisconnected {
349        peer_id: PeerId,
350        cause: Option<String>,
351    },
352    /// Content found in DHT
353    ContentFound { cid: String, providers: Vec<PeerId> },
354    /// New peer discovered
355    PeerDiscovered {
356        peer_id: PeerId,
357        addrs: Vec<Multiaddr>,
358    },
359    /// Listening on a new address
360    ListeningOn { address: Multiaddr },
361    /// Connection error occurred
362    ConnectionError {
363        peer_id: Option<PeerId>,
364        error: String,
365    },
366    /// DHT bootstrap completed
367    DhtBootstrapCompleted,
368    /// NAT status changed
369    NatStatusChanged {
370        old_status: String,
371        new_status: String,
372    },
373}
374
375/// Connection endpoint information
376#[derive(Debug, Clone)]
377pub enum ConnectionEndpoint {
378    /// Dialer (outbound connection)
379    Dialer { address: Multiaddr },
380    /// Listener (inbound connection)
381    Listener {
382        local_addr: Multiaddr,
383        send_back_addr: Multiaddr,
384    },
385}
386
387/// Default keypair filename
388const KEYPAIR_FILENAME: &str = "identity.key";
389
390impl NetworkNode {
391    /// Create a new network node
392    pub fn new(config: NetworkConfig) -> IpfrsResult<Self> {
393        info!("Creating network node with libp2p");
394
395        // Load or generate keypair for stable identity
396        let keypair = Self::load_or_generate_keypair(&config.data_dir)?;
397        let peer_id = keypair.public().to_peer_id();
398
399        info!("Local peer ID: {}", peer_id);
400
401        // Create event channel
402        let (event_tx, event_rx) = mpsc::channel(1024);
403
404        // Build the swarm
405        let swarm = Self::build_swarm(keypair, &config)?;
406
407        Ok(Self {
408            config,
409            peer_id,
410            swarm: Some(swarm),
411            shutdown_tx: None,
412            event_tx,
413            event_rx: Some(event_rx),
414            external_addrs: Arc::new(RwLock::new(Vec::new())),
415            connected_peers: Arc::new(DashSet::new()),
416            bandwidth_stats: Arc::new(RwLock::new(BandwidthStats::default())),
417        })
418    }
419
420    /// Load existing keypair or generate a new one
421    fn load_or_generate_keypair(data_dir: &Path) -> IpfrsResult<identity::Keypair> {
422        let key_path = data_dir.join(KEYPAIR_FILENAME);
423
424        if key_path.exists() {
425            info!("Loading existing identity from {:?}", key_path);
426            Self::load_keypair(&key_path)
427        } else {
428            info!("Generating new identity");
429            let keypair = identity::Keypair::generate_ed25519();
430
431            // Create data directory if it doesn't exist
432            if !data_dir.exists() {
433                fs::create_dir_all(data_dir).map_err(ipfrs_core::error::Error::Io)?;
434            }
435
436            // Save the new keypair
437            Self::save_keypair(&keypair, &key_path)?;
438            info!("Saved new identity to {:?}", key_path);
439
440            Ok(keypair)
441        }
442    }
443
444    /// Load keypair from file
445    fn load_keypair(path: &Path) -> IpfrsResult<identity::Keypair> {
446        let bytes = fs::read(path).map_err(ipfrs_core::error::Error::Io)?;
447
448        identity::Keypair::from_protobuf_encoding(&bytes).map_err(|e| {
449            ipfrs_core::error::Error::Network(format!("Failed to decode keypair: {}", e))
450        })
451    }
452
453    /// Save keypair to file
454    fn save_keypair(keypair: &identity::Keypair, path: &Path) -> IpfrsResult<()> {
455        let bytes = keypair.to_protobuf_encoding().map_err(|e| {
456            ipfrs_core::error::Error::Network(format!("Failed to encode keypair: {}", e))
457        })?;
458
459        fs::write(path, bytes).map_err(ipfrs_core::error::Error::Io)?;
460
461        // Set restrictive permissions on Unix
462        #[cfg(unix)]
463        {
464            use std::os::unix::fs::PermissionsExt;
465            let permissions = fs::Permissions::from_mode(0o600);
466            fs::set_permissions(path, permissions).map_err(ipfrs_core::error::Error::Io)?;
467        }
468
469        Ok(())
470    }
471
472    /// Build libp2p swarm with all protocols
473    #[allow(clippy::too_many_lines)]
474    fn build_swarm(
475        keypair: identity::Keypair,
476        config: &NetworkConfig,
477    ) -> IpfrsResult<Swarm<IpfrsBehaviour>> {
478        let peer_id = keypair.public().to_peer_id();
479
480        // Create relay client for NAT traversal (behavior only - transport handled separately)
481        let (_relay_transport, relay_client) = relay::client::new(peer_id);
482
483        // Build TCP transport with noise and yamux
484        let tcp_transport = libp2p::tcp::tokio::Transport::default()
485            .upgrade(libp2p::core::upgrade::Version::V1)
486            .authenticate(noise::Config::new(&keypair).map_err(std::io::Error::other)?)
487            .multiplex(libp2p::yamux::Config::default())
488            .map(|(peer_id, muxer), _| (peer_id, libp2p::core::muxing::StreamMuxerBox::new(muxer)));
489
490        // Build QUIC transport
491        let quic_transport = libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(
492            &keypair,
493        ))
494        .map(|(peer_id, muxer), _| (peer_id, libp2p::core::muxing::StreamMuxerBox::new(muxer)));
495
496        // Combine transports: QUIC primary with TCP fallback
497        let transport = if config.enable_quic {
498            // QUIC with TCP fallback
499            quic_transport
500                .or_transport(tcp_transport)
501                .map(|either, _| either.into_inner())
502                .boxed()
503        } else {
504            // TCP only
505            tcp_transport.boxed()
506        };
507
508        // Create Kademlia DHT with tunable config
509        let store = kad::store::MemoryStore::new(peer_id);
510        let mut kad_config = kad::Config::default();
511
512        // Apply tunable parameters
513        kad_config.set_query_timeout(Duration::from_secs(config.kademlia.query_timeout_secs));
514        kad_config.set_replication_factor(
515            std::num::NonZeroUsize::new(config.kademlia.replication_factor)
516                .expect("Replication factor must be > 0"),
517        );
518        kad_config.set_parallelism(
519            std::num::NonZeroUsize::new(config.kademlia.alpha).expect("Alpha must be > 0"),
520        );
521        kad_config.set_kbucket_inserts(kad::BucketInserts::OnConnected);
522
523        // Set max k-bucket size if possible (note: libp2p has a fixed K=20 in current versions)
524        // The kbucket_size parameter is kept for future compatibility
525
526        let kademlia = kad::Behaviour::with_config(peer_id, store, kad_config);
527
528        // Create Identify protocol
529        let identify = identify::Behaviour::new(
530            identify::Config::new("/ipfrs/1.0.0".to_string(), keypair.public())
531                .with_agent_version(format!("ipfrs/{}", env!("CARGO_PKG_VERSION"))),
532        );
533
534        // Create Ping protocol for connectivity checks
535        let ping = ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(15)));
536
537        // Create AutoNAT for NAT detection
538        let autonat = autonat::Behaviour::new(
539            peer_id,
540            autonat::Config {
541                only_global_ips: false,
542                ..Default::default()
543            },
544        );
545
546        // Create DCUtR for hole punching
547        let dcutr = dcutr::Behaviour::new(peer_id);
548
549        // Create mDNS for local network discovery (if enabled)
550        let mdns = if config.enable_mdns {
551            mdns::tokio::Behaviour::new(mdns::Config::default(), peer_id)
552                .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?
553        } else {
554            // Disabled mDNS - still need to create one but it won't discover
555            mdns::tokio::Behaviour::new(
556                mdns::Config {
557                    ttl: Duration::from_secs(1),
558                    query_interval: Duration::from_secs(3600), // Very long interval
559                    enable_ipv6: false,
560                },
561                peer_id,
562            )
563            .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?
564        };
565
566        // Combine into network behavior
567        let behaviour = IpfrsBehaviour {
568            kademlia,
569            identify,
570            ping,
571            autonat,
572            dcutr,
573            mdns,
574            relay_client,
575        };
576
577        // Create swarm with tokio executor
578        let mut swarm_config = libp2p::swarm::Config::with_executor(|fut| {
579            tokio::spawn(fut);
580        });
581        swarm_config = swarm_config.with_idle_connection_timeout(Duration::from_secs(60));
582
583        let swarm = Swarm::new(transport, behaviour, peer_id, swarm_config);
584
585        Ok(swarm)
586    }
587
588    /// Start the network node
589    pub async fn start(&mut self) -> IpfrsResult<()> {
590        info!("🚀 IPFRS Network Node Starting");
591        info!("   Peer ID: {}", self.peer_id);
592        info!("   QUIC enabled: {}", self.config.enable_quic);
593
594        let mut swarm = self.swarm.take().ok_or_else(|| {
595            ipfrs_core::error::Error::Network("Swarm already started".to_string())
596        })?;
597
598        // Listen on configured addresses
599        for addr_str in &self.config.listen_addrs {
600            let addr: Multiaddr = addr_str.parse().map_err(|e| {
601                ipfrs_core::error::Error::Network(format!("Invalid multiaddr: {}", e))
602            })?;
603
604            swarm
605                .listen_on(addr.clone())
606                .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?;
607
608            info!("   Listening on: {}", addr);
609        }
610
611        // Bootstrap DHT with configured peers
612        for peer_str in &self.config.bootstrap_peers {
613            match peer_str.parse::<Multiaddr>() {
614                Ok(addr) => {
615                    if let Err(e) = swarm.dial(addr.clone()) {
616                        warn!("Failed to dial bootstrap peer {}: {}", addr, e);
617                    } else {
618                        info!("   Dialing bootstrap peer: {}", addr);
619                    }
620                }
621                Err(e) => {
622                    warn!("Invalid bootstrap peer address {}: {}", peer_str, e);
623                }
624            }
625        }
626
627        // Put DHT into server mode
628        swarm
629            .behaviour_mut()
630            .kademlia
631            .set_mode(Some(kad::Mode::Server));
632
633        // Bootstrap the DHT
634        if let Err(e) = swarm.behaviour_mut().kademlia.bootstrap() {
635            warn!("DHT bootstrap failed: {}", e);
636        }
637
638        // Create shutdown channel
639        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
640        self.shutdown_tx = Some(shutdown_tx);
641
642        let event_tx = self.event_tx.clone();
643        let external_addrs = Arc::clone(&self.external_addrs);
644        let connected_peers = Arc::clone(&self.connected_peers);
645
646        info!("✅ Network node ready");
647        info!(
648            "   Transport: {}",
649            if self.config.enable_quic {
650                "QUIC"
651            } else {
652                "TCP"
653            }
654        );
655        info!("   DHT mode: Server");
656
657        // Event loop
658        tokio::spawn(async move {
659            loop {
660                tokio::select! {
661                    event = swarm.select_next_some() => {
662                        Self::handle_swarm_event(event, &event_tx, swarm.behaviour_mut(), &external_addrs, &connected_peers).await;
663                    }
664                    _ = shutdown_rx.recv() => {
665                        info!("Shutting down network node");
666                        break;
667                    }
668                }
669            }
670        });
671
672        Ok(())
673    }
674
675    /// Handle swarm events
676    async fn handle_swarm_event(
677        event: SwarmEvent<IpfrsBehaviourEvent>,
678        event_tx: &mpsc::Sender<NetworkEvent>,
679        _behaviour: &mut IpfrsBehaviour,
680        external_addrs: &Arc<RwLock<Vec<Multiaddr>>>,
681        connected_peers: &Arc<DashSet<PeerId>>,
682    ) {
683        match event {
684            SwarmEvent::NewListenAddr { address, .. } => {
685                info!("Listening on {}", address);
686                let _ = event_tx
687                    .send(NetworkEvent::ListeningOn {
688                        address: address.clone(),
689                    })
690                    .await;
691            }
692            SwarmEvent::Behaviour(IpfrsBehaviourEvent::Identify(identify::Event::Received {
693                peer_id,
694                info,
695                ..
696            })) => {
697                debug!("Identified peer {}: {:?}", peer_id, info);
698                let _ = event_tx
699                    .send(NetworkEvent::PeerDiscovered {
700                        peer_id,
701                        addrs: info.listen_addrs,
702                    })
703                    .await;
704            }
705            SwarmEvent::Behaviour(IpfrsBehaviourEvent::Kademlia(
706                kad::Event::OutboundQueryProgressed { result, .. },
707            )) => match result {
708                kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders {
709                    key,
710                    providers,
711                })) => {
712                    let cid = String::from_utf8_lossy(key.as_ref()).to_string();
713                    debug!("Found {} providers for {}", providers.len(), cid);
714                    let _ = event_tx
715                        .send(NetworkEvent::ContentFound {
716                            cid,
717                            providers: providers.into_iter().collect(),
718                        })
719                        .await;
720                }
721                kad::QueryResult::GetProviders(Err(e)) => {
722                    debug!("GetProviders query failed: {:?}", e);
723                }
724                kad::QueryResult::Bootstrap(Ok(_)) => {
725                    info!("DHT bootstrap completed");
726                    let _ = event_tx.send(NetworkEvent::DhtBootstrapCompleted).await;
727                }
728                kad::QueryResult::Bootstrap(Err(e)) => {
729                    warn!("DHT bootstrap failed: {:?}", e);
730                }
731                _ => {}
732            },
733            SwarmEvent::ConnectionEstablished {
734                peer_id,
735                endpoint,
736                established_in,
737                ..
738            } => {
739                info!("Connected to peer: {} in {:?}", peer_id, established_in);
740
741                // Track connected peer
742                connected_peers.insert(peer_id);
743
744                let conn_endpoint = if endpoint.is_dialer() {
745                    ConnectionEndpoint::Dialer {
746                        address: endpoint.get_remote_address().clone(),
747                    }
748                } else {
749                    ConnectionEndpoint::Listener {
750                        local_addr: endpoint.get_remote_address().clone(),
751                        send_back_addr: endpoint.get_remote_address().clone(),
752                    }
753                };
754
755                let _ = event_tx
756                    .send(NetworkEvent::PeerConnected {
757                        peer_id,
758                        endpoint: conn_endpoint,
759                        established_in,
760                    })
761                    .await;
762            }
763            SwarmEvent::ConnectionClosed {
764                peer_id,
765                cause,
766                num_established,
767                ..
768            } => {
769                info!("Disconnected from peer {}: {:?}", peer_id, cause);
770
771                // Remove peer from tracking if no more connections remain
772                if num_established == 0 {
773                    connected_peers.remove(&peer_id);
774                }
775
776                let _ = event_tx
777                    .send(NetworkEvent::PeerDisconnected {
778                        peer_id,
779                        cause: cause.map(|c| format!("{:?}", c)),
780                    })
781                    .await;
782            }
783            SwarmEvent::IncomingConnection { .. } => {
784                debug!("Incoming connection");
785            }
786            SwarmEvent::IncomingConnectionError { error, .. } => {
787                debug!("Incoming connection error: {}", error);
788                let _ = event_tx
789                    .send(NetworkEvent::ConnectionError {
790                        peer_id: None,
791                        error: error.to_string(),
792                    })
793                    .await;
794            }
795            SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
796                warn!("Outgoing connection error to {:?}: {}", peer_id, error);
797                let _ = event_tx
798                    .send(NetworkEvent::ConnectionError {
799                        peer_id,
800                        error: error.to_string(),
801                    })
802                    .await;
803            }
804            SwarmEvent::Behaviour(IpfrsBehaviourEvent::Autonat(autonat_event)) => {
805                match autonat_event {
806                    autonat::Event::InboundProbe(_) => {
807                        debug!("AutoNAT inbound probe");
808                    }
809                    autonat::Event::OutboundProbe(_) => {
810                        debug!("AutoNAT outbound probe");
811                    }
812                    autonat::Event::StatusChanged { old, new } => {
813                        info!("AutoNAT status changed from {:?} to {:?}", old, new);
814
815                        let old_status = format!("{:?}", old);
816                        let new_status = format!("{:?}", new);
817
818                        let _ = event_tx
819                            .send(NetworkEvent::NatStatusChanged {
820                                old_status,
821                                new_status,
822                            })
823                            .await;
824
825                        match new {
826                            autonat::NatStatus::Public(addr) => {
827                                info!("Public address confirmed: {}", addr);
828                                // Track external address
829                                let mut addrs = external_addrs.write();
830                                if !addrs.contains(&addr) {
831                                    addrs.push(addr);
832                                }
833                            }
834                            autonat::NatStatus::Private => {
835                                info!("Node is behind NAT");
836                                // Clear external addresses when behind NAT
837                                external_addrs.write().clear();
838                            }
839                            autonat::NatStatus::Unknown => {
840                                debug!("NAT status unknown");
841                            }
842                        }
843                    }
844                }
845            }
846            SwarmEvent::Behaviour(IpfrsBehaviourEvent::Dcutr(dcutr_event)) => {
847                debug!("DCUtR event: {:?}", dcutr_event);
848            }
849            SwarmEvent::Behaviour(IpfrsBehaviourEvent::Mdns(mdns_event)) => match mdns_event {
850                mdns::Event::Discovered(peers) => {
851                    for (peer_id, addr) in peers {
852                        info!("mDNS discovered peer {} at {}", peer_id, addr);
853                        let _ = event_tx
854                            .send(NetworkEvent::PeerDiscovered {
855                                peer_id,
856                                addrs: vec![addr],
857                            })
858                            .await;
859                    }
860                }
861                mdns::Event::Expired(peers) => {
862                    for (peer_id, addr) in peers {
863                        debug!("mDNS peer expired: {} at {}", peer_id, addr);
864                    }
865                }
866            },
867            SwarmEvent::Behaviour(IpfrsBehaviourEvent::RelayClient(relay_event)) => {
868                debug!("Relay client event: {:?}", relay_event);
869            }
870            SwarmEvent::Behaviour(IpfrsBehaviourEvent::Ping(ping_event)) => {
871                if let Ok(rtt) = ping_event.result {
872                    debug!("Ping to {:?}: RTT = {:?}", ping_event.peer, rtt);
873                }
874            }
875            _ => {}
876        }
877    }
878
879    /// Stop the network node
880    pub async fn stop(&mut self) -> IpfrsResult<()> {
881        if let Some(tx) = self.shutdown_tx.take() {
882            let _ = tx.send(()).await;
883        }
884        Ok(())
885    }
886
887    /// Get local peer ID
888    pub fn peer_id(&self) -> PeerId {
889        self.peer_id
890    }
891
892    /// Get listening addresses
893    pub fn listeners(&self) -> Vec<String> {
894        self.config.listen_addrs.clone()
895    }
896
897    /// Get connected peers
898    pub fn connected_peers(&self) -> Vec<PeerId> {
899        self.connected_peers
900            .iter()
901            .map(|entry| *entry.key())
902            .collect()
903    }
904
905    /// Connect to a peer
906    pub async fn connect(&mut self, addr: Multiaddr) -> IpfrsResult<()> {
907        if let Some(swarm) = &mut self.swarm {
908            swarm
909                .dial(addr.clone())
910                .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?;
911            info!("Dialing peer: {}", addr);
912        }
913        Ok(())
914    }
915
916    /// Disconnect from a peer
917    pub async fn disconnect(&mut self, peer_id: PeerId) -> IpfrsResult<()> {
918        if let Some(swarm) = &mut self.swarm {
919            let _ = swarm.disconnect_peer_id(peer_id);
920            info!("Disconnecting from peer: {}", peer_id);
921        }
922        Ok(())
923    }
924
925    /// Announce content to DHT (provide)
926    pub async fn provide(&mut self, cid: &cid::Cid) -> IpfrsResult<()> {
927        if let Some(swarm) = &mut self.swarm {
928            let key = kad::RecordKey::new(&cid.to_bytes());
929            swarm
930                .behaviour_mut()
931                .kademlia
932                .start_providing(key)
933                .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?;
934            debug!("Announcing content: {}", cid);
935        }
936        Ok(())
937    }
938
939    /// Find providers for content in DHT
940    pub async fn find_providers(&mut self, cid: &cid::Cid) -> IpfrsResult<()> {
941        if let Some(swarm) = &mut self.swarm {
942            let key = kad::RecordKey::new(&cid.to_bytes());
943            swarm.behaviour_mut().kademlia.get_providers(key);
944            debug!("Searching for providers of: {}", cid);
945        }
946        Ok(())
947    }
948
949    /// Find node (closest peers to a given peer ID) using Kademlia
950    pub async fn find_node(&mut self, peer_id: PeerId) -> IpfrsResult<()> {
951        if let Some(swarm) = &mut self.swarm {
952            swarm.behaviour_mut().kademlia.get_closest_peers(peer_id);
953            debug!("Finding closest peers to: {}", peer_id);
954        }
955        Ok(())
956    }
957
958    /// Get the k-closest peers to our local peer ID
959    pub async fn get_closest_local_peers(&mut self) -> IpfrsResult<Vec<PeerId>> {
960        if let Some(swarm) = &mut self.swarm {
961            let mut closest_peers = Vec::new();
962
963            // Get peers from the routing table
964            for bucket in swarm.behaviour_mut().kademlia.kbuckets() {
965                for entry in bucket.iter() {
966                    closest_peers.push(*entry.node.key.preimage());
967                }
968            }
969
970            debug!("Found {} peers in routing table", closest_peers.len());
971            Ok(closest_peers)
972        } else {
973            Ok(Vec::new())
974        }
975    }
976
977    /// Bootstrap the DHT (search for our own peer ID to populate routing table)
978    pub async fn bootstrap_dht(&mut self) -> IpfrsResult<()> {
979        if let Some(swarm) = &mut self.swarm {
980            swarm
981                .behaviour_mut()
982                .kademlia
983                .bootstrap()
984                .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?;
985            info!("DHT bootstrap initiated");
986        }
987        Ok(())
988    }
989
990    /// Add an address for a peer to the routing table
991    pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> IpfrsResult<()> {
992        if let Some(swarm) = &mut self.swarm {
993            swarm
994                .behaviour_mut()
995                .kademlia
996                .add_address(&peer_id, addr.clone());
997            debug!("Added address {} for peer {}", addr, peer_id);
998        }
999        Ok(())
1000    }
1001
1002    /// Get routing table information
1003    pub fn get_routing_table_info(&mut self) -> IpfrsResult<RoutingTableInfo> {
1004        if let Some(swarm) = &mut self.swarm {
1005            let mut total_peers = 0;
1006            let mut buckets_info = Vec::new();
1007
1008            for (index, bucket) in swarm.behaviour_mut().kademlia.kbuckets().enumerate() {
1009                let num_entries = bucket.iter().count();
1010                total_peers += num_entries;
1011                buckets_info.push(BucketInfo { index, num_entries });
1012            }
1013
1014            Ok(RoutingTableInfo {
1015                total_peers,
1016                num_buckets: buckets_info.len(),
1017                buckets: buckets_info,
1018            })
1019        } else {
1020            Ok(RoutingTableInfo {
1021                total_peers: 0,
1022                num_buckets: 0,
1023                buckets: Vec::new(),
1024            })
1025        }
1026    }
1027
1028    /// Get network statistics
1029    pub fn stats(&self) -> NetworkStats {
1030        let bandwidth = self.bandwidth_stats.read();
1031        NetworkStats {
1032            peer_id: self.peer_id.to_string(),
1033            listen_addrs: self.config.listen_addrs.clone(),
1034            connected_peers: self.connected_peers.len(),
1035            quic_enabled: self.config.enable_quic,
1036            bytes_received: bandwidth.bytes_received,
1037            bytes_sent: bandwidth.bytes_sent,
1038            bootstrap_peers: self.config.bootstrap_peers.clone(),
1039        }
1040    }
1041
1042    /// Take the event receiver
1043    pub fn take_event_receiver(&mut self) -> Option<mpsc::Receiver<NetworkEvent>> {
1044        self.event_rx.take()
1045    }
1046
1047    /// Get confirmed external addresses
1048    pub fn get_external_addresses(&self) -> Vec<Multiaddr> {
1049        self.external_addrs.read().clone()
1050    }
1051
1052    /// Check if node has public reachability
1053    pub fn is_publicly_reachable(&self) -> bool {
1054        !self.external_addrs.read().is_empty()
1055    }
1056
1057    /// Check if connected to a specific peer
1058    pub fn is_connected_to(&self, peer_id: &PeerId) -> bool {
1059        self.connected_peers.contains(peer_id)
1060    }
1061
1062    /// Get number of connected peers
1063    pub fn get_peer_count(&self) -> usize {
1064        self.connected_peers.len()
1065    }
1066
1067    /// Connect to multiple peers in batch
1068    pub async fn connect_to_peers(&mut self, addrs: Vec<Multiaddr>) -> Vec<IpfrsResult<()>> {
1069        let mut results = Vec::with_capacity(addrs.len());
1070
1071        for addr in addrs {
1072            let result = self.connect(addr).await;
1073            results.push(result);
1074        }
1075
1076        results
1077    }
1078
1079    /// Disconnect from all connected peers
1080    pub async fn disconnect_all(&mut self) -> IpfrsResult<()> {
1081        let peers: Vec<PeerId> = self.connected_peers().clone();
1082
1083        for peer in peers {
1084            let _ = self.disconnect(peer).await;
1085        }
1086
1087        Ok(())
1088    }
1089
1090    /// Update bandwidth statistics manually (for custom tracking)
1091    pub fn update_bandwidth(&self, bytes_sent: u64, bytes_received: u64) {
1092        let mut stats = self.bandwidth_stats.write();
1093        stats.bytes_sent += bytes_sent;
1094        stats.bytes_received += bytes_received;
1095    }
1096
1097    /// Get total bandwidth sent
1098    pub fn get_bytes_sent(&self) -> u64 {
1099        self.bandwidth_stats.read().bytes_sent
1100    }
1101
1102    /// Get total bandwidth received
1103    pub fn get_bytes_received(&self) -> u64 {
1104        self.bandwidth_stats.read().bytes_received
1105    }
1106
1107    /// Reset bandwidth statistics
1108    pub fn reset_bandwidth_stats(&self) {
1109        let mut stats = self.bandwidth_stats.write();
1110        stats.bytes_sent = 0;
1111        stats.bytes_received = 0;
1112    }
1113
1114    /// Get network health summary
1115    pub fn get_network_health(&self) -> NetworkHealthSummary {
1116        let peer_count = self.get_peer_count();
1117        let is_public = self.is_publicly_reachable();
1118        let has_external_addrs = !self.external_addrs.read().is_empty();
1119
1120        // Determine health status
1121        let status = if peer_count >= 10 && is_public {
1122            NetworkHealthLevel::Healthy
1123        } else if peer_count >= 3 || has_external_addrs {
1124            NetworkHealthLevel::Degraded
1125        } else if peer_count > 0 {
1126            NetworkHealthLevel::Limited
1127        } else {
1128            NetworkHealthLevel::Disconnected
1129        };
1130
1131        NetworkHealthSummary {
1132            status,
1133            connected_peers: peer_count,
1134            is_publicly_reachable: is_public,
1135            external_addresses: self.get_external_addresses().len(),
1136        }
1137    }
1138
1139    /// Check if node is healthy
1140    pub fn is_healthy(&self) -> bool {
1141        matches!(
1142            self.get_network_health().status,
1143            NetworkHealthLevel::Healthy
1144        )
1145    }
1146}
1147
1148/// Network statistics
1149#[derive(Debug, Clone, serde::Serialize)]
1150pub struct NetworkStats {
1151    pub peer_id: String,
1152    pub listen_addrs: Vec<String>,
1153    pub connected_peers: usize,
1154    pub quic_enabled: bool,
1155    /// Total bytes received
1156    pub bytes_received: u64,
1157    /// Total bytes sent
1158    pub bytes_sent: u64,
1159    /// Bootstrap peers
1160    pub bootstrap_peers: Vec<String>,
1161}
1162
1163/// Information about a k-bucket in the routing table
1164#[derive(Debug, Clone, serde::Serialize)]
1165pub struct BucketInfo {
1166    /// Bucket index
1167    pub index: usize,
1168    /// Number of entries in this bucket
1169    pub num_entries: usize,
1170}
1171
1172/// Routing table information
1173#[derive(Debug, Clone, serde::Serialize)]
1174pub struct RoutingTableInfo {
1175    /// Total number of peers in routing table
1176    pub total_peers: usize,
1177    /// Number of buckets
1178    pub num_buckets: usize,
1179    /// Information about each bucket
1180    pub buckets: Vec<BucketInfo>,
1181}
1182
1183/// Network health summary
1184#[derive(Debug, Clone, serde::Serialize)]
1185pub struct NetworkHealthSummary {
1186    /// Overall health status
1187    pub status: NetworkHealthLevel,
1188    /// Number of connected peers
1189    pub connected_peers: usize,
1190    /// Whether node is publicly reachable
1191    pub is_publicly_reachable: bool,
1192    /// Number of external addresses
1193    pub external_addresses: usize,
1194}
1195
1196/// Network health level
1197#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1198pub enum NetworkHealthLevel {
1199    /// Fully connected with good peer count and public reachability
1200    Healthy,
1201    /// Connected but with limited peers or no public reachability
1202    Degraded,
1203    /// Minimal connectivity
1204    Limited,
1205    /// No connections
1206    Disconnected,
1207}