1use dashmap::DashSet;
4use futures::StreamExt;
5use libp2p::{
6 autonat,
7 core::Transport as _,
8 dcutr, identify, identity, kad, mdns, noise, ping, relay,
9 swarm::{NetworkBehaviour, SwarmEvent},
10 Multiaddr, PeerId, Swarm,
11};
12use parking_lot::RwLock;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::mpsc;
18use tracing::{debug, info, warn};
19
20type IpfrsResult<T> = ipfrs_core::error::Result<T>;
22
23#[derive(Debug, Clone)]
25pub struct KademliaConfig {
26 pub query_timeout_secs: u64,
28 pub replication_factor: usize,
30 pub alpha: usize,
32 pub kbucket_size: usize,
34}
35
36impl Default for KademliaConfig {
37 fn default() -> Self {
38 Self {
39 query_timeout_secs: 60,
41 replication_factor: 20,
43 alpha: 3,
45 kbucket_size: 20,
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
53pub struct NetworkConfig {
54 pub listen_addrs: Vec<String>,
56 pub bootstrap_peers: Vec<String>,
58 pub enable_quic: bool,
60 pub data_dir: PathBuf,
62 pub enable_mdns: bool,
64 pub enable_nat_traversal: bool,
66 pub relay_servers: Vec<String>,
68 pub kademlia: KademliaConfig,
70 pub max_connections: Option<usize>,
72 pub max_inbound_connections: Option<usize>,
74 pub max_outbound_connections: Option<usize>,
76 pub connection_buffer_size: usize,
78 pub low_memory_mode: bool,
80}
81
82impl Default for NetworkConfig {
83 fn default() -> Self {
84 Self {
85 listen_addrs: vec![
86 "/ip4/0.0.0.0/udp/0/quic-v1".to_string(),
87 "/ip6/::/udp/0/quic-v1".to_string(),
88 ],
89 bootstrap_peers: vec![],
90 enable_quic: true,
91 enable_mdns: false,
92 enable_nat_traversal: true,
93 relay_servers: vec![],
94 data_dir: PathBuf::from(".ipfrs"),
95 kademlia: KademliaConfig::default(),
96 max_connections: None,
97 max_inbound_connections: None,
98 max_outbound_connections: None,
99 connection_buffer_size: 64 * 1024, low_memory_mode: false,
101 }
102 }
103}
104
105impl NetworkConfig {
106 pub fn low_memory() -> Self {
117 Self {
118 listen_addrs: vec!["/ip4/0.0.0.0/udp/0/quic-v1".to_string()],
119 bootstrap_peers: vec![],
120 enable_quic: true,
121 enable_mdns: false, enable_nat_traversal: false, relay_servers: vec![],
124 data_dir: PathBuf::from(".ipfrs"),
125 kademlia: KademliaConfig {
126 query_timeout_secs: 30, replication_factor: 10, alpha: 2, kbucket_size: 10, },
131 max_connections: Some(16), max_inbound_connections: Some(8),
133 max_outbound_connections: Some(8),
134 connection_buffer_size: 8 * 1024, low_memory_mode: true,
136 }
137 }
138
139 pub fn iot() -> Self {
150 Self {
151 listen_addrs: vec!["/ip4/0.0.0.0/udp/0/quic-v1".to_string()],
152 bootstrap_peers: vec![],
153 enable_quic: true,
154 enable_mdns: true, enable_nat_traversal: true,
156 relay_servers: vec![],
157 data_dir: PathBuf::from(".ipfrs"),
158 kademlia: KademliaConfig {
159 query_timeout_secs: 45,
160 replication_factor: 15,
161 alpha: 2,
162 kbucket_size: 15,
163 },
164 max_connections: Some(32),
165 max_inbound_connections: Some(16),
166 max_outbound_connections: Some(16),
167 connection_buffer_size: 16 * 1024, low_memory_mode: false,
169 }
170 }
171
172 pub fn mobile() -> Self {
183 Self {
184 listen_addrs: vec!["/ip4/0.0.0.0/udp/0/quic-v1".to_string()],
185 bootstrap_peers: vec![],
186 enable_quic: true,
187 enable_mdns: false, enable_nat_traversal: true,
189 relay_servers: vec![],
190 data_dir: PathBuf::from(".ipfrs"),
191 kademlia: KademliaConfig {
192 query_timeout_secs: 60,
193 replication_factor: 20,
194 alpha: 3,
195 kbucket_size: 20,
196 },
197 max_connections: Some(64),
198 max_inbound_connections: Some(32),
199 max_outbound_connections: Some(32),
200 connection_buffer_size: 32 * 1024, low_memory_mode: false,
202 }
203 }
204
205 pub fn high_performance() -> Self {
215 Self {
216 listen_addrs: vec![
217 "/ip4/0.0.0.0/udp/0/quic-v1".to_string(),
218 "/ip6/::/udp/0/quic-v1".to_string(),
219 ],
220 bootstrap_peers: vec![],
221 enable_quic: true,
222 enable_mdns: true,
223 enable_nat_traversal: true,
224 relay_servers: vec![],
225 data_dir: PathBuf::from(".ipfrs"),
226 kademlia: KademliaConfig {
227 query_timeout_secs: 60,
228 replication_factor: 20,
229 alpha: 3,
230 kbucket_size: 20,
231 },
232 max_connections: None, max_inbound_connections: None,
234 max_outbound_connections: None,
235 connection_buffer_size: 128 * 1024, low_memory_mode: false,
237 }
238 }
239}
240
241#[derive(NetworkBehaviour)]
243#[behaviour(to_swarm = "IpfrsBehaviourEvent")]
244pub struct IpfrsBehaviour {
245 pub kademlia: kad::Behaviour<kad::store::MemoryStore>,
247 pub identify: identify::Behaviour,
249 pub ping: ping::Behaviour,
251 pub autonat: autonat::Behaviour,
253 pub dcutr: dcutr::Behaviour,
255 pub mdns: mdns::tokio::Behaviour,
257 pub relay_client: relay::client::Behaviour,
259}
260
261#[derive(Debug)]
263pub enum IpfrsBehaviourEvent {
264 Kademlia(kad::Event),
265 Identify(identify::Event),
266 Ping(ping::Event),
267 Autonat(autonat::Event),
268 Dcutr(dcutr::Event),
269 Mdns(mdns::Event),
270 RelayClient(relay::client::Event),
271}
272
273impl From<kad::Event> for IpfrsBehaviourEvent {
274 fn from(event: kad::Event) -> Self {
275 IpfrsBehaviourEvent::Kademlia(event)
276 }
277}
278
279impl From<identify::Event> for IpfrsBehaviourEvent {
280 fn from(event: identify::Event) -> Self {
281 IpfrsBehaviourEvent::Identify(event)
282 }
283}
284
285impl From<ping::Event> for IpfrsBehaviourEvent {
286 fn from(event: ping::Event) -> Self {
287 IpfrsBehaviourEvent::Ping(event)
288 }
289}
290
291impl From<autonat::Event> for IpfrsBehaviourEvent {
292 fn from(event: autonat::Event) -> Self {
293 IpfrsBehaviourEvent::Autonat(event)
294 }
295}
296
297impl From<dcutr::Event> for IpfrsBehaviourEvent {
298 fn from(event: dcutr::Event) -> Self {
299 IpfrsBehaviourEvent::Dcutr(event)
300 }
301}
302
303impl From<mdns::Event> for IpfrsBehaviourEvent {
304 fn from(event: mdns::Event) -> Self {
305 IpfrsBehaviourEvent::Mdns(event)
306 }
307}
308
309impl From<relay::client::Event> for IpfrsBehaviourEvent {
310 fn from(event: relay::client::Event) -> Self {
311 IpfrsBehaviourEvent::RelayClient(event)
312 }
313}
314
315pub struct NetworkNode {
317 config: NetworkConfig,
318 peer_id: PeerId,
319 swarm: Option<Swarm<IpfrsBehaviour>>,
320 shutdown_tx: Option<mpsc::Sender<()>>,
321 event_tx: mpsc::Sender<NetworkEvent>,
322 event_rx: Option<mpsc::Receiver<NetworkEvent>>,
323 external_addrs: Arc<parking_lot::RwLock<Vec<Multiaddr>>>,
325 connected_peers: Arc<DashSet<PeerId>>,
327 bandwidth_stats: Arc<parking_lot::RwLock<BandwidthStats>>,
329}
330
331#[derive(Debug, Clone, Default)]
333struct BandwidthStats {
334 bytes_sent: u64,
335 bytes_received: u64,
336}
337
338#[derive(Debug, Clone)]
340pub enum NetworkEvent {
341 PeerConnected {
343 peer_id: PeerId,
344 endpoint: ConnectionEndpoint,
345 established_in: std::time::Duration,
346 },
347 PeerDisconnected {
349 peer_id: PeerId,
350 cause: Option<String>,
351 },
352 ContentFound { cid: String, providers: Vec<PeerId> },
354 PeerDiscovered {
356 peer_id: PeerId,
357 addrs: Vec<Multiaddr>,
358 },
359 ListeningOn { address: Multiaddr },
361 ConnectionError {
363 peer_id: Option<PeerId>,
364 error: String,
365 },
366 DhtBootstrapCompleted,
368 NatStatusChanged {
370 old_status: String,
371 new_status: String,
372 },
373}
374
375#[derive(Debug, Clone)]
377pub enum ConnectionEndpoint {
378 Dialer { address: Multiaddr },
380 Listener {
382 local_addr: Multiaddr,
383 send_back_addr: Multiaddr,
384 },
385}
386
387const KEYPAIR_FILENAME: &str = "identity.key";
389
390impl NetworkNode {
391 pub fn new(config: NetworkConfig) -> IpfrsResult<Self> {
393 info!("Creating network node with libp2p");
394
395 let keypair = Self::load_or_generate_keypair(&config.data_dir)?;
397 let peer_id = keypair.public().to_peer_id();
398
399 info!("Local peer ID: {}", peer_id);
400
401 let (event_tx, event_rx) = mpsc::channel(1024);
403
404 let swarm = Self::build_swarm(keypair, &config)?;
406
407 Ok(Self {
408 config,
409 peer_id,
410 swarm: Some(swarm),
411 shutdown_tx: None,
412 event_tx,
413 event_rx: Some(event_rx),
414 external_addrs: Arc::new(RwLock::new(Vec::new())),
415 connected_peers: Arc::new(DashSet::new()),
416 bandwidth_stats: Arc::new(RwLock::new(BandwidthStats::default())),
417 })
418 }
419
420 fn load_or_generate_keypair(data_dir: &Path) -> IpfrsResult<identity::Keypair> {
422 let key_path = data_dir.join(KEYPAIR_FILENAME);
423
424 if key_path.exists() {
425 info!("Loading existing identity from {:?}", key_path);
426 Self::load_keypair(&key_path)
427 } else {
428 info!("Generating new identity");
429 let keypair = identity::Keypair::generate_ed25519();
430
431 if !data_dir.exists() {
433 fs::create_dir_all(data_dir).map_err(ipfrs_core::error::Error::Io)?;
434 }
435
436 Self::save_keypair(&keypair, &key_path)?;
438 info!("Saved new identity to {:?}", key_path);
439
440 Ok(keypair)
441 }
442 }
443
444 fn load_keypair(path: &Path) -> IpfrsResult<identity::Keypair> {
446 let bytes = fs::read(path).map_err(ipfrs_core::error::Error::Io)?;
447
448 identity::Keypair::from_protobuf_encoding(&bytes).map_err(|e| {
449 ipfrs_core::error::Error::Network(format!("Failed to decode keypair: {}", e))
450 })
451 }
452
453 fn save_keypair(keypair: &identity::Keypair, path: &Path) -> IpfrsResult<()> {
455 let bytes = keypair.to_protobuf_encoding().map_err(|e| {
456 ipfrs_core::error::Error::Network(format!("Failed to encode keypair: {}", e))
457 })?;
458
459 fs::write(path, bytes).map_err(ipfrs_core::error::Error::Io)?;
460
461 #[cfg(unix)]
463 {
464 use std::os::unix::fs::PermissionsExt;
465 let permissions = fs::Permissions::from_mode(0o600);
466 fs::set_permissions(path, permissions).map_err(ipfrs_core::error::Error::Io)?;
467 }
468
469 Ok(())
470 }
471
472 #[allow(clippy::too_many_lines)]
474 fn build_swarm(
475 keypair: identity::Keypair,
476 config: &NetworkConfig,
477 ) -> IpfrsResult<Swarm<IpfrsBehaviour>> {
478 let peer_id = keypair.public().to_peer_id();
479
480 let (_relay_transport, relay_client) = relay::client::new(peer_id);
482
483 let tcp_transport = libp2p::tcp::tokio::Transport::default()
485 .upgrade(libp2p::core::upgrade::Version::V1)
486 .authenticate(noise::Config::new(&keypair).map_err(std::io::Error::other)?)
487 .multiplex(libp2p::yamux::Config::default())
488 .map(|(peer_id, muxer), _| (peer_id, libp2p::core::muxing::StreamMuxerBox::new(muxer)));
489
490 let quic_transport = libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(
492 &keypair,
493 ))
494 .map(|(peer_id, muxer), _| (peer_id, libp2p::core::muxing::StreamMuxerBox::new(muxer)));
495
496 let transport = if config.enable_quic {
498 quic_transport
500 .or_transport(tcp_transport)
501 .map(|either, _| either.into_inner())
502 .boxed()
503 } else {
504 tcp_transport.boxed()
506 };
507
508 let store = kad::store::MemoryStore::new(peer_id);
510 let mut kad_config = kad::Config::default();
511
512 kad_config.set_query_timeout(Duration::from_secs(config.kademlia.query_timeout_secs));
514 kad_config.set_replication_factor(
515 std::num::NonZeroUsize::new(config.kademlia.replication_factor)
516 .expect("Replication factor must be > 0"),
517 );
518 kad_config.set_parallelism(
519 std::num::NonZeroUsize::new(config.kademlia.alpha).expect("Alpha must be > 0"),
520 );
521 kad_config.set_kbucket_inserts(kad::BucketInserts::OnConnected);
522
523 let kademlia = kad::Behaviour::with_config(peer_id, store, kad_config);
527
528 let identify = identify::Behaviour::new(
530 identify::Config::new("/ipfrs/1.0.0".to_string(), keypair.public())
531 .with_agent_version(format!("ipfrs/{}", env!("CARGO_PKG_VERSION"))),
532 );
533
534 let ping = ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(15)));
536
537 let autonat = autonat::Behaviour::new(
539 peer_id,
540 autonat::Config {
541 only_global_ips: false,
542 ..Default::default()
543 },
544 );
545
546 let dcutr = dcutr::Behaviour::new(peer_id);
548
549 let mdns = if config.enable_mdns {
551 mdns::tokio::Behaviour::new(mdns::Config::default(), peer_id)
552 .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?
553 } else {
554 mdns::tokio::Behaviour::new(
556 mdns::Config {
557 ttl: Duration::from_secs(1),
558 query_interval: Duration::from_secs(3600), enable_ipv6: false,
560 },
561 peer_id,
562 )
563 .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?
564 };
565
566 let behaviour = IpfrsBehaviour {
568 kademlia,
569 identify,
570 ping,
571 autonat,
572 dcutr,
573 mdns,
574 relay_client,
575 };
576
577 let mut swarm_config = libp2p::swarm::Config::with_executor(|fut| {
579 tokio::spawn(fut);
580 });
581 swarm_config = swarm_config.with_idle_connection_timeout(Duration::from_secs(60));
582
583 let swarm = Swarm::new(transport, behaviour, peer_id, swarm_config);
584
585 Ok(swarm)
586 }
587
588 pub async fn start(&mut self) -> IpfrsResult<()> {
590 info!("🚀 IPFRS Network Node Starting");
591 info!(" Peer ID: {}", self.peer_id);
592 info!(" QUIC enabled: {}", self.config.enable_quic);
593
594 let mut swarm = self.swarm.take().ok_or_else(|| {
595 ipfrs_core::error::Error::Network("Swarm already started".to_string())
596 })?;
597
598 for addr_str in &self.config.listen_addrs {
600 let addr: Multiaddr = addr_str.parse().map_err(|e| {
601 ipfrs_core::error::Error::Network(format!("Invalid multiaddr: {}", e))
602 })?;
603
604 swarm
605 .listen_on(addr.clone())
606 .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?;
607
608 info!(" Listening on: {}", addr);
609 }
610
611 for peer_str in &self.config.bootstrap_peers {
613 match peer_str.parse::<Multiaddr>() {
614 Ok(addr) => {
615 if let Err(e) = swarm.dial(addr.clone()) {
616 warn!("Failed to dial bootstrap peer {}: {}", addr, e);
617 } else {
618 info!(" Dialing bootstrap peer: {}", addr);
619 }
620 }
621 Err(e) => {
622 warn!("Invalid bootstrap peer address {}: {}", peer_str, e);
623 }
624 }
625 }
626
627 swarm
629 .behaviour_mut()
630 .kademlia
631 .set_mode(Some(kad::Mode::Server));
632
633 if let Err(e) = swarm.behaviour_mut().kademlia.bootstrap() {
635 warn!("DHT bootstrap failed: {}", e);
636 }
637
638 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
640 self.shutdown_tx = Some(shutdown_tx);
641
642 let event_tx = self.event_tx.clone();
643 let external_addrs = Arc::clone(&self.external_addrs);
644 let connected_peers = Arc::clone(&self.connected_peers);
645
646 info!("✅ Network node ready");
647 info!(
648 " Transport: {}",
649 if self.config.enable_quic {
650 "QUIC"
651 } else {
652 "TCP"
653 }
654 );
655 info!(" DHT mode: Server");
656
657 tokio::spawn(async move {
659 loop {
660 tokio::select! {
661 event = swarm.select_next_some() => {
662 Self::handle_swarm_event(event, &event_tx, swarm.behaviour_mut(), &external_addrs, &connected_peers).await;
663 }
664 _ = shutdown_rx.recv() => {
665 info!("Shutting down network node");
666 break;
667 }
668 }
669 }
670 });
671
672 Ok(())
673 }
674
675 async fn handle_swarm_event(
677 event: SwarmEvent<IpfrsBehaviourEvent>,
678 event_tx: &mpsc::Sender<NetworkEvent>,
679 _behaviour: &mut IpfrsBehaviour,
680 external_addrs: &Arc<RwLock<Vec<Multiaddr>>>,
681 connected_peers: &Arc<DashSet<PeerId>>,
682 ) {
683 match event {
684 SwarmEvent::NewListenAddr { address, .. } => {
685 info!("Listening on {}", address);
686 let _ = event_tx
687 .send(NetworkEvent::ListeningOn {
688 address: address.clone(),
689 })
690 .await;
691 }
692 SwarmEvent::Behaviour(IpfrsBehaviourEvent::Identify(identify::Event::Received {
693 peer_id,
694 info,
695 ..
696 })) => {
697 debug!("Identified peer {}: {:?}", peer_id, info);
698 let _ = event_tx
699 .send(NetworkEvent::PeerDiscovered {
700 peer_id,
701 addrs: info.listen_addrs,
702 })
703 .await;
704 }
705 SwarmEvent::Behaviour(IpfrsBehaviourEvent::Kademlia(
706 kad::Event::OutboundQueryProgressed { result, .. },
707 )) => match result {
708 kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders {
709 key,
710 providers,
711 })) => {
712 let cid = String::from_utf8_lossy(key.as_ref()).to_string();
713 debug!("Found {} providers for {}", providers.len(), cid);
714 let _ = event_tx
715 .send(NetworkEvent::ContentFound {
716 cid,
717 providers: providers.into_iter().collect(),
718 })
719 .await;
720 }
721 kad::QueryResult::GetProviders(Err(e)) => {
722 debug!("GetProviders query failed: {:?}", e);
723 }
724 kad::QueryResult::Bootstrap(Ok(_)) => {
725 info!("DHT bootstrap completed");
726 let _ = event_tx.send(NetworkEvent::DhtBootstrapCompleted).await;
727 }
728 kad::QueryResult::Bootstrap(Err(e)) => {
729 warn!("DHT bootstrap failed: {:?}", e);
730 }
731 _ => {}
732 },
733 SwarmEvent::ConnectionEstablished {
734 peer_id,
735 endpoint,
736 established_in,
737 ..
738 } => {
739 info!("Connected to peer: {} in {:?}", peer_id, established_in);
740
741 connected_peers.insert(peer_id);
743
744 let conn_endpoint = if endpoint.is_dialer() {
745 ConnectionEndpoint::Dialer {
746 address: endpoint.get_remote_address().clone(),
747 }
748 } else {
749 ConnectionEndpoint::Listener {
750 local_addr: endpoint.get_remote_address().clone(),
751 send_back_addr: endpoint.get_remote_address().clone(),
752 }
753 };
754
755 let _ = event_tx
756 .send(NetworkEvent::PeerConnected {
757 peer_id,
758 endpoint: conn_endpoint,
759 established_in,
760 })
761 .await;
762 }
763 SwarmEvent::ConnectionClosed {
764 peer_id,
765 cause,
766 num_established,
767 ..
768 } => {
769 info!("Disconnected from peer {}: {:?}", peer_id, cause);
770
771 if num_established == 0 {
773 connected_peers.remove(&peer_id);
774 }
775
776 let _ = event_tx
777 .send(NetworkEvent::PeerDisconnected {
778 peer_id,
779 cause: cause.map(|c| format!("{:?}", c)),
780 })
781 .await;
782 }
783 SwarmEvent::IncomingConnection { .. } => {
784 debug!("Incoming connection");
785 }
786 SwarmEvent::IncomingConnectionError { error, .. } => {
787 debug!("Incoming connection error: {}", error);
788 let _ = event_tx
789 .send(NetworkEvent::ConnectionError {
790 peer_id: None,
791 error: error.to_string(),
792 })
793 .await;
794 }
795 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
796 warn!("Outgoing connection error to {:?}: {}", peer_id, error);
797 let _ = event_tx
798 .send(NetworkEvent::ConnectionError {
799 peer_id,
800 error: error.to_string(),
801 })
802 .await;
803 }
804 SwarmEvent::Behaviour(IpfrsBehaviourEvent::Autonat(autonat_event)) => {
805 match autonat_event {
806 autonat::Event::InboundProbe(_) => {
807 debug!("AutoNAT inbound probe");
808 }
809 autonat::Event::OutboundProbe(_) => {
810 debug!("AutoNAT outbound probe");
811 }
812 autonat::Event::StatusChanged { old, new } => {
813 info!("AutoNAT status changed from {:?} to {:?}", old, new);
814
815 let old_status = format!("{:?}", old);
816 let new_status = format!("{:?}", new);
817
818 let _ = event_tx
819 .send(NetworkEvent::NatStatusChanged {
820 old_status,
821 new_status,
822 })
823 .await;
824
825 match new {
826 autonat::NatStatus::Public(addr) => {
827 info!("Public address confirmed: {}", addr);
828 let mut addrs = external_addrs.write();
830 if !addrs.contains(&addr) {
831 addrs.push(addr);
832 }
833 }
834 autonat::NatStatus::Private => {
835 info!("Node is behind NAT");
836 external_addrs.write().clear();
838 }
839 autonat::NatStatus::Unknown => {
840 debug!("NAT status unknown");
841 }
842 }
843 }
844 }
845 }
846 SwarmEvent::Behaviour(IpfrsBehaviourEvent::Dcutr(dcutr_event)) => {
847 debug!("DCUtR event: {:?}", dcutr_event);
848 }
849 SwarmEvent::Behaviour(IpfrsBehaviourEvent::Mdns(mdns_event)) => match mdns_event {
850 mdns::Event::Discovered(peers) => {
851 for (peer_id, addr) in peers {
852 info!("mDNS discovered peer {} at {}", peer_id, addr);
853 let _ = event_tx
854 .send(NetworkEvent::PeerDiscovered {
855 peer_id,
856 addrs: vec![addr],
857 })
858 .await;
859 }
860 }
861 mdns::Event::Expired(peers) => {
862 for (peer_id, addr) in peers {
863 debug!("mDNS peer expired: {} at {}", peer_id, addr);
864 }
865 }
866 },
867 SwarmEvent::Behaviour(IpfrsBehaviourEvent::RelayClient(relay_event)) => {
868 debug!("Relay client event: {:?}", relay_event);
869 }
870 SwarmEvent::Behaviour(IpfrsBehaviourEvent::Ping(ping_event)) => {
871 if let Ok(rtt) = ping_event.result {
872 debug!("Ping to {:?}: RTT = {:?}", ping_event.peer, rtt);
873 }
874 }
875 _ => {}
876 }
877 }
878
879 pub async fn stop(&mut self) -> IpfrsResult<()> {
881 if let Some(tx) = self.shutdown_tx.take() {
882 let _ = tx.send(()).await;
883 }
884 Ok(())
885 }
886
887 pub fn peer_id(&self) -> PeerId {
889 self.peer_id
890 }
891
892 pub fn listeners(&self) -> Vec<String> {
894 self.config.listen_addrs.clone()
895 }
896
897 pub fn connected_peers(&self) -> Vec<PeerId> {
899 self.connected_peers
900 .iter()
901 .map(|entry| *entry.key())
902 .collect()
903 }
904
905 pub async fn connect(&mut self, addr: Multiaddr) -> IpfrsResult<()> {
907 if let Some(swarm) = &mut self.swarm {
908 swarm
909 .dial(addr.clone())
910 .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?;
911 info!("Dialing peer: {}", addr);
912 }
913 Ok(())
914 }
915
916 pub async fn disconnect(&mut self, peer_id: PeerId) -> IpfrsResult<()> {
918 if let Some(swarm) = &mut self.swarm {
919 let _ = swarm.disconnect_peer_id(peer_id);
920 info!("Disconnecting from peer: {}", peer_id);
921 }
922 Ok(())
923 }
924
925 pub async fn provide(&mut self, cid: &cid::Cid) -> IpfrsResult<()> {
927 if let Some(swarm) = &mut self.swarm {
928 let key = kad::RecordKey::new(&cid.to_bytes());
929 swarm
930 .behaviour_mut()
931 .kademlia
932 .start_providing(key)
933 .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?;
934 debug!("Announcing content: {}", cid);
935 }
936 Ok(())
937 }
938
939 pub async fn find_providers(&mut self, cid: &cid::Cid) -> IpfrsResult<()> {
941 if let Some(swarm) = &mut self.swarm {
942 let key = kad::RecordKey::new(&cid.to_bytes());
943 swarm.behaviour_mut().kademlia.get_providers(key);
944 debug!("Searching for providers of: {}", cid);
945 }
946 Ok(())
947 }
948
949 pub async fn find_node(&mut self, peer_id: PeerId) -> IpfrsResult<()> {
951 if let Some(swarm) = &mut self.swarm {
952 swarm.behaviour_mut().kademlia.get_closest_peers(peer_id);
953 debug!("Finding closest peers to: {}", peer_id);
954 }
955 Ok(())
956 }
957
958 pub async fn get_closest_local_peers(&mut self) -> IpfrsResult<Vec<PeerId>> {
960 if let Some(swarm) = &mut self.swarm {
961 let mut closest_peers = Vec::new();
962
963 for bucket in swarm.behaviour_mut().kademlia.kbuckets() {
965 for entry in bucket.iter() {
966 closest_peers.push(*entry.node.key.preimage());
967 }
968 }
969
970 debug!("Found {} peers in routing table", closest_peers.len());
971 Ok(closest_peers)
972 } else {
973 Ok(Vec::new())
974 }
975 }
976
977 pub async fn bootstrap_dht(&mut self) -> IpfrsResult<()> {
979 if let Some(swarm) = &mut self.swarm {
980 swarm
981 .behaviour_mut()
982 .kademlia
983 .bootstrap()
984 .map_err(|e| ipfrs_core::error::Error::Network(e.to_string()))?;
985 info!("DHT bootstrap initiated");
986 }
987 Ok(())
988 }
989
990 pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> IpfrsResult<()> {
992 if let Some(swarm) = &mut self.swarm {
993 swarm
994 .behaviour_mut()
995 .kademlia
996 .add_address(&peer_id, addr.clone());
997 debug!("Added address {} for peer {}", addr, peer_id);
998 }
999 Ok(())
1000 }
1001
1002 pub fn get_routing_table_info(&mut self) -> IpfrsResult<RoutingTableInfo> {
1004 if let Some(swarm) = &mut self.swarm {
1005 let mut total_peers = 0;
1006 let mut buckets_info = Vec::new();
1007
1008 for (index, bucket) in swarm.behaviour_mut().kademlia.kbuckets().enumerate() {
1009 let num_entries = bucket.iter().count();
1010 total_peers += num_entries;
1011 buckets_info.push(BucketInfo { index, num_entries });
1012 }
1013
1014 Ok(RoutingTableInfo {
1015 total_peers,
1016 num_buckets: buckets_info.len(),
1017 buckets: buckets_info,
1018 })
1019 } else {
1020 Ok(RoutingTableInfo {
1021 total_peers: 0,
1022 num_buckets: 0,
1023 buckets: Vec::new(),
1024 })
1025 }
1026 }
1027
1028 pub fn stats(&self) -> NetworkStats {
1030 let bandwidth = self.bandwidth_stats.read();
1031 NetworkStats {
1032 peer_id: self.peer_id.to_string(),
1033 listen_addrs: self.config.listen_addrs.clone(),
1034 connected_peers: self.connected_peers.len(),
1035 quic_enabled: self.config.enable_quic,
1036 bytes_received: bandwidth.bytes_received,
1037 bytes_sent: bandwidth.bytes_sent,
1038 bootstrap_peers: self.config.bootstrap_peers.clone(),
1039 }
1040 }
1041
1042 pub fn take_event_receiver(&mut self) -> Option<mpsc::Receiver<NetworkEvent>> {
1044 self.event_rx.take()
1045 }
1046
1047 pub fn get_external_addresses(&self) -> Vec<Multiaddr> {
1049 self.external_addrs.read().clone()
1050 }
1051
1052 pub fn is_publicly_reachable(&self) -> bool {
1054 !self.external_addrs.read().is_empty()
1055 }
1056
1057 pub fn is_connected_to(&self, peer_id: &PeerId) -> bool {
1059 self.connected_peers.contains(peer_id)
1060 }
1061
1062 pub fn get_peer_count(&self) -> usize {
1064 self.connected_peers.len()
1065 }
1066
1067 pub async fn connect_to_peers(&mut self, addrs: Vec<Multiaddr>) -> Vec<IpfrsResult<()>> {
1069 let mut results = Vec::with_capacity(addrs.len());
1070
1071 for addr in addrs {
1072 let result = self.connect(addr).await;
1073 results.push(result);
1074 }
1075
1076 results
1077 }
1078
1079 pub async fn disconnect_all(&mut self) -> IpfrsResult<()> {
1081 let peers: Vec<PeerId> = self.connected_peers().clone();
1082
1083 for peer in peers {
1084 let _ = self.disconnect(peer).await;
1085 }
1086
1087 Ok(())
1088 }
1089
1090 pub fn update_bandwidth(&self, bytes_sent: u64, bytes_received: u64) {
1092 let mut stats = self.bandwidth_stats.write();
1093 stats.bytes_sent += bytes_sent;
1094 stats.bytes_received += bytes_received;
1095 }
1096
1097 pub fn get_bytes_sent(&self) -> u64 {
1099 self.bandwidth_stats.read().bytes_sent
1100 }
1101
1102 pub fn get_bytes_received(&self) -> u64 {
1104 self.bandwidth_stats.read().bytes_received
1105 }
1106
1107 pub fn reset_bandwidth_stats(&self) {
1109 let mut stats = self.bandwidth_stats.write();
1110 stats.bytes_sent = 0;
1111 stats.bytes_received = 0;
1112 }
1113
1114 pub fn get_network_health(&self) -> NetworkHealthSummary {
1116 let peer_count = self.get_peer_count();
1117 let is_public = self.is_publicly_reachable();
1118 let has_external_addrs = !self.external_addrs.read().is_empty();
1119
1120 let status = if peer_count >= 10 && is_public {
1122 NetworkHealthLevel::Healthy
1123 } else if peer_count >= 3 || has_external_addrs {
1124 NetworkHealthLevel::Degraded
1125 } else if peer_count > 0 {
1126 NetworkHealthLevel::Limited
1127 } else {
1128 NetworkHealthLevel::Disconnected
1129 };
1130
1131 NetworkHealthSummary {
1132 status,
1133 connected_peers: peer_count,
1134 is_publicly_reachable: is_public,
1135 external_addresses: self.get_external_addresses().len(),
1136 }
1137 }
1138
1139 pub fn is_healthy(&self) -> bool {
1141 matches!(
1142 self.get_network_health().status,
1143 NetworkHealthLevel::Healthy
1144 )
1145 }
1146}
1147
1148#[derive(Debug, Clone, serde::Serialize)]
1150pub struct NetworkStats {
1151 pub peer_id: String,
1152 pub listen_addrs: Vec<String>,
1153 pub connected_peers: usize,
1154 pub quic_enabled: bool,
1155 pub bytes_received: u64,
1157 pub bytes_sent: u64,
1159 pub bootstrap_peers: Vec<String>,
1161}
1162
1163#[derive(Debug, Clone, serde::Serialize)]
1165pub struct BucketInfo {
1166 pub index: usize,
1168 pub num_entries: usize,
1170}
1171
1172#[derive(Debug, Clone, serde::Serialize)]
1174pub struct RoutingTableInfo {
1175 pub total_peers: usize,
1177 pub num_buckets: usize,
1179 pub buckets: Vec<BucketInfo>,
1181}
1182
1183#[derive(Debug, Clone, serde::Serialize)]
1185pub struct NetworkHealthSummary {
1186 pub status: NetworkHealthLevel,
1188 pub connected_peers: usize,
1190 pub is_publicly_reachable: bool,
1192 pub external_addresses: usize,
1194}
1195
1196#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1198pub enum NetworkHealthLevel {
1199 Healthy,
1201 Degraded,
1203 Limited,
1205 Disconnected,
1207}