Skip to main content

ferripfs_network/
host.rs

1// Ported from: kubo/core/node/libp2p/host.go
2// Kubo version: v0.39.0
3//
4// Original work: Copyright (c) Protocol Labs, Inc.
5// Port: Copyright (c) 2026 ferripfs contributors
6// SPDX-License-Identifier: MIT OR Apache-2.0
7
8//! Network host initialization and management.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use futures::StreamExt;
15use libp2p::{identity::Keypair, swarm::SwarmEvent, Multiaddr, PeerId, Swarm};
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use crate::{
19    behavior::FerripfsBehavior,
20    swarm::{parse_multiaddr, parse_peer_id, SwarmBuilder, SwarmConfig},
21    ConnectedPeer, NetworkError, NetworkResult, PeerInfo, PingResult,
22};
23
24/// Configuration for creating a network host.
25#[derive(Debug, Clone)]
26pub struct HostConfig {
27    /// Keypair for node identity
28    pub keypair: Keypair,
29    /// Addresses to listen on
30    pub listen_addrs: Vec<String>,
31    /// Bootstrap peers
32    pub bootstrap_peers: Vec<String>,
33    /// Swarm configuration
34    pub swarm_config: SwarmConfig,
35}
36
37impl HostConfig {
38    /// Create host config from ferripfs config.
39    pub fn from_config(config: &ferripfs_config::Config, keypair: Keypair) -> Self {
40        Self {
41            keypair,
42            listen_addrs: config.addresses.swarm.clone(),
43            bootstrap_peers: config.bootstrap.clone(),
44            swarm_config: SwarmConfig::from_config(config),
45        }
46    }
47}
48
49/// Commands that can be sent to the network host.
50#[derive(Debug)]
51pub enum HostCommand {
52    /// Get local peer info
53    GetPeerInfo(oneshot::Sender<PeerInfo>),
54    /// Get connected peers
55    GetConnectedPeers(oneshot::Sender<Vec<ConnectedPeer>>),
56    /// Connect to a peer
57    Connect(Multiaddr, oneshot::Sender<Result<(), String>>),
58    /// Disconnect from a peer
59    Disconnect(PeerId, oneshot::Sender<Result<(), String>>),
60    /// Get known addresses for all peers
61    GetKnownAddresses(oneshot::Sender<HashMap<String, Vec<String>>>),
62    /// Ping a peer
63    Ping(PeerId, oneshot::Sender<PingResult>),
64    /// Get listen addresses
65    GetListenAddresses(oneshot::Sender<Vec<String>>),
66    /// Shutdown the host
67    Shutdown(oneshot::Sender<()>),
68    /// Find providers for a CID (DHT)
69    FindProviders(Vec<u8>, oneshot::Sender<crate::DhtQueryResult>),
70    /// Find a peer in the DHT
71    FindPeer(PeerId, oneshot::Sender<crate::DhtQueryResult>),
72    /// Provide content to the DHT
73    Provide(Vec<u8>, oneshot::Sender<crate::DhtQueryResult>),
74    /// Put a value in the DHT
75    PutValue(Vec<u8>, Vec<u8>, oneshot::Sender<crate::DhtQueryResult>),
76    /// Get a value from the DHT
77    GetValue(Vec<u8>, oneshot::Sender<crate::DhtQueryResult>),
78    /// Get DHT statistics
79    GetDhtStats(oneshot::Sender<crate::DhtStats>),
80    /// Bootstrap the DHT
81    Bootstrap(oneshot::Sender<Result<(), String>>),
82}
83
84/// Network host providing high-level network operations.
85pub struct NetworkHost {
86    /// Local peer ID
87    peer_id: PeerId,
88    /// Command sender
89    cmd_tx: mpsc::Sender<HostCommand>,
90    /// Listen addresses (updated by swarm)
91    #[allow(dead_code)]
92    listen_addrs: Arc<RwLock<Vec<Multiaddr>>>,
93}
94
95impl NetworkHost {
96    /// Create and start a new network host.
97    pub async fn start(config: HostConfig) -> NetworkResult<Self> {
98        let local_peer_id = PeerId::from(config.keypair.public());
99        let _local_public_key = config.keypair.public();
100
101        // Parse listen addresses
102        let listen_addrs: Vec<Multiaddr> = config
103            .listen_addrs
104            .iter()
105            .filter_map(|s| parse_multiaddr(s).ok())
106            .collect();
107
108        // Build swarm
109        let mut swarm = SwarmBuilder::new(config.keypair.clone())
110            .with_config(config.swarm_config)
111            .with_listen_addrs(listen_addrs.clone())
112            .build()?;
113
114        // Start listening on configured addresses
115        for addr in &listen_addrs {
116            swarm
117                .listen_on(addr.clone())
118                .map_err(|e| NetworkError::Transport(e.to_string()))?;
119        }
120
121        // Add bootstrap peers to Kademlia
122        for peer_str in &config.bootstrap_peers {
123            if let Ok((peer_id, addr)) = crate::swarm::parse_bootstrap_peer(peer_str) {
124                swarm.behaviour_mut().add_address(&peer_id, addr);
125            }
126        }
127
128        // Create command channel
129        let (cmd_tx, cmd_rx) = mpsc::channel(32);
130        let actual_listen_addrs = Arc::new(RwLock::new(Vec::new()));
131        let listen_addrs_clone = actual_listen_addrs.clone();
132
133        // Spawn swarm event loop
134        let keypair = config.keypair.clone();
135        tokio::spawn(async move {
136            Self::run_event_loop(swarm, cmd_rx, listen_addrs_clone, keypair).await;
137        });
138
139        Ok(Self {
140            peer_id: local_peer_id,
141            cmd_tx,
142            listen_addrs: actual_listen_addrs,
143        })
144    }
145
146    /// Run the swarm event loop.
147    async fn run_event_loop(
148        mut swarm: Swarm<FerripfsBehavior>,
149        mut cmd_rx: mpsc::Receiver<HostCommand>,
150        listen_addrs: Arc<RwLock<Vec<Multiaddr>>>,
151        keypair: Keypair,
152    ) {
153        let local_peer_id = PeerId::from(keypair.public());
154        let local_public_key = keypair.public();
155
156        // Track pending pings
157        let mut pending_pings: HashMap<PeerId, oneshot::Sender<PingResult>> = HashMap::new();
158
159        // Track pending DHT queries
160        let mut pending_find_providers: HashMap<
161            kad::QueryId,
162            oneshot::Sender<crate::DhtQueryResult>,
163        > = HashMap::new();
164        let mut pending_find_peers: HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>> =
165            HashMap::new();
166        let mut pending_provide: HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>> =
167            HashMap::new();
168        let mut pending_put_value: HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>> =
169            HashMap::new();
170        let mut pending_get_value: HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>> =
171            HashMap::new();
172
173        loop {
174            tokio::select! {
175                // Handle swarm events
176                event = swarm.select_next_some() => {
177                    match event {
178                        SwarmEvent::NewListenAddr { address, .. } => {
179                            let mut addrs = listen_addrs.write().await;
180                            addrs.push(address.clone());
181                            tracing::info!("Listening on {}", address);
182                        }
183                        SwarmEvent::Behaviour(event) => {
184                            // Handle behavior events
185                            Self::handle_behavior_event(
186                                event,
187                                &mut pending_pings,
188                                &mut pending_find_providers,
189                                &mut pending_find_peers,
190                                &mut pending_provide,
191                                &mut pending_put_value,
192                                &mut pending_get_value,
193                            );
194                        }
195                        SwarmEvent::ConnectionEstablished { peer_id, .. } => {
196                            tracing::debug!("Connected to {}", peer_id);
197                        }
198                        SwarmEvent::ConnectionClosed { peer_id, .. } => {
199                            tracing::debug!("Disconnected from {}", peer_id);
200                        }
201                        SwarmEvent::IncomingConnection { .. } => {}
202                        SwarmEvent::IncomingConnectionError { .. } => {}
203                        SwarmEvent::OutgoingConnectionError { .. } => {}
204                        SwarmEvent::ExpiredListenAddr { address, .. } => {
205                            let mut addrs = listen_addrs.write().await;
206                            addrs.retain(|a| a != &address);
207                        }
208                        SwarmEvent::ListenerClosed { .. } => {}
209                        SwarmEvent::ListenerError { .. } => {}
210                        SwarmEvent::Dialing { .. } => {}
211                        SwarmEvent::NewExternalAddrCandidate { .. } => {}
212                        SwarmEvent::ExternalAddrConfirmed { .. } => {}
213                        SwarmEvent::ExternalAddrExpired { .. } => {}
214                        SwarmEvent::NewExternalAddrOfPeer { .. } => {}
215                        _ => {}
216                    }
217                }
218                // Handle commands
219                Some(cmd) = cmd_rx.recv() => {
220                    match cmd {
221                        HostCommand::GetPeerInfo(tx) => {
222                            let addrs = listen_addrs.read().await;
223                            let info = PeerInfo {
224                                id: local_peer_id.to_string(),
225                                public_key: bs58::encode(local_public_key.encode_protobuf()).into_string(),
226                                addresses: addrs.iter().map(|a| {
227                                    format!("{}/p2p/{}", a, local_peer_id)
228                                }).collect(),
229                                agent_version: crate::AGENT_VERSION.to_string(),
230                                protocol_version: crate::PROTOCOL_VERSION.to_string(),
231                                protocols: vec![
232                                    "/ipfs/id/1.0.0".to_string(),
233                                    "/ipfs/ping/1.0.0".to_string(),
234                                    "/ipfs/kad/1.0.0".to_string(),
235                                ],
236                            };
237                            let _ = tx.send(info);
238                        }
239                        HostCommand::GetConnectedPeers(tx) => {
240                            let peers: Vec<ConnectedPeer> = swarm
241                                .connected_peers()
242                                .map(|peer_id| {
243                                    let _info = swarm.network_info();
244                                    ConnectedPeer {
245                                        addr: String::new(), // Would need connection info
246                                        peer: peer_id.to_string(),
247                                        latency: None,
248                                        muxer: Some("yamux".into()),
249                                        direction: "unknown".into(),
250                                    }
251                                })
252                                .collect();
253                            let _ = tx.send(peers);
254                        }
255                        HostCommand::Connect(addr, tx) => {
256                            match swarm.dial(addr) {
257                                Ok(_) => { let _ = tx.send(Ok(())); }
258                                Err(e) => { let _ = tx.send(Err(e.to_string())); }
259                            }
260                        }
261                        HostCommand::Disconnect(peer_id, tx) => {
262                            let _ = swarm.disconnect_peer_id(peer_id);
263                            let _ = tx.send(Ok(()));
264                        }
265                        HostCommand::GetKnownAddresses(tx) => {
266                            let mut addrs: HashMap<String, Vec<String>> = HashMap::new();
267                            // Get addresses from Kademlia routing table
268                            for bucket in swarm.behaviour_mut().kademlia.kbuckets() {
269                                for entry in bucket.iter() {
270                                    let peer_id = entry.node.key.preimage().to_string();
271                                    let peer_addrs: Vec<String> = entry.node.value
272                                        .iter()
273                                        .map(|a| a.to_string())
274                                        .collect();
275                                    addrs.insert(peer_id, peer_addrs);
276                                }
277                            }
278                            let _ = tx.send(addrs);
279                        }
280                        HostCommand::Ping(peer_id, tx) => {
281                            pending_pings.insert(peer_id, tx);
282                            // Ping will be handled by ping behavior automatically
283                        }
284                        HostCommand::GetListenAddresses(tx) => {
285                            let addrs = listen_addrs.read().await;
286                            let _ = tx.send(addrs.iter().map(|a| a.to_string()).collect());
287                        }
288                        HostCommand::Shutdown(tx) => {
289                            let _ = tx.send(());
290                            break;
291                        }
292                        HostCommand::FindProviders(key, tx) => {
293                            let record_key = kad::RecordKey::new(&key);
294                            let query_id = swarm.behaviour_mut().get_providers(record_key);
295                            pending_find_providers.insert(query_id, tx);
296                        }
297                        HostCommand::FindPeer(peer_id, tx) => {
298                            let query_id = swarm.behaviour_mut().get_closest_peers(peer_id);
299                            pending_find_peers.insert(query_id, tx);
300                        }
301                        HostCommand::Provide(key, tx) => {
302                            let record_key = kad::RecordKey::new(&key);
303                            match swarm.behaviour_mut().start_providing(record_key) {
304                                Ok(query_id) => {
305                                    pending_provide.insert(query_id, tx);
306                                }
307                                Err(e) => {
308                                    let _ = tx.send(crate::DhtQueryResult::failure(
309                                        "provide",
310                                        format!("failed to start providing: {}", e),
311                                    ));
312                                }
313                            }
314                        }
315                        HostCommand::PutValue(key, value, tx) => {
316                            let record_key = kad::RecordKey::new(&key);
317                            let query_id = swarm.behaviour_mut().put_value(record_key, value);
318                            pending_put_value.insert(query_id, tx);
319                        }
320                        HostCommand::GetValue(key, tx) => {
321                            let record_key = kad::RecordKey::new(&key);
322                            let query_id = swarm.behaviour_mut().get_value(record_key);
323                            pending_get_value.insert(query_id, tx);
324                        }
325                        HostCommand::GetDhtStats(tx) => {
326                            let (buckets, total_peers) = swarm.behaviour_mut().get_routing_table_info();
327                            let stats = crate::DhtStats {
328                                name: "kademlia".to_string(),
329                                buckets: buckets as u32,
330                                total_peers: total_peers as u32,
331                                mode: "client".to_string(), // Default mode
332                            };
333                            let _ = tx.send(stats);
334                        }
335                        HostCommand::Bootstrap(tx) => {
336                            match swarm.behaviour_mut().bootstrap() {
337                                Ok(_) => { let _ = tx.send(Ok(())); }
338                                Err(e) => { let _ = tx.send(Err(format!("bootstrap failed: {:?}", e))); }
339                            }
340                        }
341                    }
342                }
343            }
344        }
345    }
346
347    /// Handle behavior-specific events.
348    fn handle_behavior_event(
349        event: crate::behavior::FerripfsBehaviorEvent,
350        pending_pings: &mut HashMap<PeerId, oneshot::Sender<PingResult>>,
351        pending_find_providers: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
352        pending_find_peers: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
353        pending_provide: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
354        pending_put_value: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
355        pending_get_value: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
356    ) {
357        use crate::behavior::FerripfsBehaviorEvent;
358
359        match event {
360            FerripfsBehaviorEvent::Ping(ping::Event { peer, result, .. }) => {
361                if let Some(tx) = pending_pings.remove(&peer) {
362                    let ping_result = match result {
363                        Ok(rtt) => PingResult {
364                            success: true,
365                            time: rtt.as_millis() as u64,
366                            text: format!("Pong received: time={}ms", rtt.as_millis()),
367                        },
368                        Err(e) => PingResult {
369                            success: false,
370                            time: 0,
371                            text: format!("Ping failed: {}", e),
372                        },
373                    };
374                    let _ = tx.send(ping_result);
375                }
376            }
377            FerripfsBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
378                for (peer_id, addr) in peers {
379                    tracing::debug!("mDNS discovered {} at {}", peer_id, addr);
380                }
381            }
382            FerripfsBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
383                for (peer_id, addr) in peers {
384                    tracing::debug!("mDNS peer expired {} at {}", peer_id, addr);
385                }
386            }
387            FerripfsBehaviorEvent::Identify(identify::Event::Received {
388                peer_id, info, ..
389            }) => {
390                tracing::debug!("Identified peer {} running {}", peer_id, info.agent_version);
391            }
392            FerripfsBehaviorEvent::Kademlia(kad::Event::RoutingUpdated { peer, .. }) => {
393                tracing::debug!("Kademlia routing updated for {}", peer);
394            }
395            FerripfsBehaviorEvent::Kademlia(kad::Event::OutboundQueryProgressed {
396                id,
397                result,
398                ..
399            }) => match result {
400                kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders {
401                    providers,
402                    ..
403                })) => {
404                    if let Some(tx) = pending_find_providers.remove(&id) {
405                        let dht_providers: Vec<crate::DhtProvider> = providers
406                            .into_iter()
407                            .map(|p| crate::DhtProvider {
408                                id: p.to_string(),
409                                addrs: vec![],
410                            })
411                            .collect();
412                        let _ = tx.send(crate::DhtQueryResult::success("providers", dht_providers));
413                    }
414                }
415                kad::QueryResult::GetProviders(Ok(
416                    kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
417                )) => {
418                    if let Some(tx) = pending_find_providers.remove(&id) {
419                        let _ = tx.send(crate::DhtQueryResult::success("providers", vec![]));
420                    }
421                }
422                kad::QueryResult::GetProviders(Err(e)) => {
423                    if let Some(tx) = pending_find_providers.remove(&id) {
424                        let _ = tx.send(crate::DhtQueryResult::failure(
425                            "providers",
426                            format!("{:?}", e),
427                        ));
428                    }
429                }
430                kad::QueryResult::GetClosestPeers(Ok(kad::GetClosestPeersOk { peers, .. })) => {
431                    if let Some(tx) = pending_find_peers.remove(&id) {
432                        let dht_peers: Vec<crate::DhtProvider> = peers
433                            .into_iter()
434                            .map(|p| crate::DhtProvider {
435                                id: p.peer_id.to_string(),
436                                addrs: p.addrs.iter().map(|a| a.to_string()).collect(),
437                            })
438                            .collect();
439                        let _ = tx.send(crate::DhtQueryResult::success("closest_peers", dht_peers));
440                    }
441                }
442                kad::QueryResult::GetClosestPeers(Err(e)) => {
443                    if let Some(tx) = pending_find_peers.remove(&id) {
444                        let _ = tx.send(crate::DhtQueryResult::failure(
445                            "closest_peers",
446                            format!("{:?}", e),
447                        ));
448                    }
449                }
450                kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { .. })) => {
451                    if let Some(tx) = pending_provide.remove(&id) {
452                        let _ = tx.send(crate::DhtQueryResult::success("provide", vec![]));
453                    }
454                }
455                kad::QueryResult::StartProviding(Err(e)) => {
456                    if let Some(tx) = pending_provide.remove(&id) {
457                        let _ = tx.send(crate::DhtQueryResult::failure(
458                            "provide",
459                            format!("{:?}", e),
460                        ));
461                    }
462                }
463                kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { .. })) => {
464                    if let Some(tx) = pending_put_value.remove(&id) {
465                        let _ = tx.send(crate::DhtQueryResult::success("put_value", vec![]));
466                    }
467                }
468                kad::QueryResult::PutRecord(Err(e)) => {
469                    if let Some(tx) = pending_put_value.remove(&id) {
470                        let _ = tx.send(crate::DhtQueryResult::failure(
471                            "put_value",
472                            format!("{:?}", e),
473                        ));
474                    }
475                }
476                kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(peer_record))) => {
477                    if let Some(tx) = pending_get_value.remove(&id) {
478                        let value = String::from_utf8_lossy(&peer_record.record.value).to_string();
479                        let _ = tx.send(crate::DhtQueryResult::success_with_extra(
480                            "get_value",
481                            value,
482                        ));
483                    }
484                }
485                kad::QueryResult::GetRecord(Ok(
486                    kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. },
487                )) => {
488                    if let Some(tx) = pending_get_value.remove(&id) {
489                        let _ = tx.send(crate::DhtQueryResult::failure(
490                            "get_value",
491                            "record not found".into(),
492                        ));
493                    }
494                }
495                kad::QueryResult::GetRecord(Err(e)) => {
496                    if let Some(tx) = pending_get_value.remove(&id) {
497                        let _ = tx.send(crate::DhtQueryResult::failure(
498                            "get_value",
499                            format!("{:?}", e),
500                        ));
501                    }
502                }
503                _ => {}
504            },
505            _ => {}
506        }
507    }
508
509    /// Get local peer ID.
510    pub fn peer_id(&self) -> &PeerId {
511        &self.peer_id
512    }
513
514    /// Get peer info.
515    pub async fn peer_info(&self) -> NetworkResult<PeerInfo> {
516        let (tx, rx) = oneshot::channel();
517        self.cmd_tx
518            .send(HostCommand::GetPeerInfo(tx))
519            .await
520            .map_err(|_| NetworkError::NotRunning)?;
521        rx.await.map_err(|_| NetworkError::NotRunning)
522    }
523
524    /// Get connected peers.
525    pub async fn connected_peers(&self) -> NetworkResult<Vec<ConnectedPeer>> {
526        let (tx, rx) = oneshot::channel();
527        self.cmd_tx
528            .send(HostCommand::GetConnectedPeers(tx))
529            .await
530            .map_err(|_| NetworkError::NotRunning)?;
531        rx.await.map_err(|_| NetworkError::NotRunning)
532    }
533
534    /// Connect to a peer.
535    pub async fn connect(&self, addr: &str) -> NetworkResult<()> {
536        let multiaddr = parse_multiaddr(addr)?;
537        let (tx, rx) = oneshot::channel();
538        self.cmd_tx
539            .send(HostCommand::Connect(multiaddr, tx))
540            .await
541            .map_err(|_| NetworkError::NotRunning)?;
542        rx.await
543            .map_err(|_| NetworkError::NotRunning)?
544            .map_err(NetworkError::Connection)
545    }
546
547    /// Disconnect from a peer.
548    pub async fn disconnect(&self, peer: &str) -> NetworkResult<()> {
549        let peer_id = parse_peer_id(peer)?;
550        let (tx, rx) = oneshot::channel();
551        self.cmd_tx
552            .send(HostCommand::Disconnect(peer_id, tx))
553            .await
554            .map_err(|_| NetworkError::NotRunning)?;
555        rx.await
556            .map_err(|_| NetworkError::NotRunning)?
557            .map_err(NetworkError::Connection)
558    }
559
560    /// Get known addresses for all peers.
561    pub async fn known_addresses(&self) -> NetworkResult<HashMap<String, Vec<String>>> {
562        let (tx, rx) = oneshot::channel();
563        self.cmd_tx
564            .send(HostCommand::GetKnownAddresses(tx))
565            .await
566            .map_err(|_| NetworkError::NotRunning)?;
567        rx.await.map_err(|_| NetworkError::NotRunning)
568    }
569
570    /// Ping a peer.
571    pub async fn ping(&self, peer: &str) -> NetworkResult<PingResult> {
572        let peer_id = parse_peer_id(peer)?;
573        let (tx, rx) = oneshot::channel();
574        self.cmd_tx
575            .send(HostCommand::Ping(peer_id, tx))
576            .await
577            .map_err(|_| NetworkError::NotRunning)?;
578        // Wait with timeout
579        match tokio::time::timeout(Duration::from_secs(10), rx).await {
580            Ok(Ok(result)) => Ok(result),
581            Ok(Err(_)) => Err(NetworkError::NotRunning),
582            Err(_) => Ok(PingResult {
583                success: false,
584                time: 0,
585                text: "Ping timeout".into(),
586            }),
587        }
588    }
589
590    /// Get listen addresses.
591    pub async fn listen_addresses(&self) -> NetworkResult<Vec<String>> {
592        let (tx, rx) = oneshot::channel();
593        self.cmd_tx
594            .send(HostCommand::GetListenAddresses(tx))
595            .await
596            .map_err(|_| NetworkError::NotRunning)?;
597        rx.await.map_err(|_| NetworkError::NotRunning)
598    }
599
600    /// Shutdown the host.
601    pub async fn shutdown(&self) -> NetworkResult<()> {
602        let (tx, rx) = oneshot::channel();
603        let _ = self.cmd_tx.send(HostCommand::Shutdown(tx)).await;
604        let _ = rx.await;
605        Ok(())
606    }
607
608    /// Find providers for content.
609    pub async fn find_providers(&self, key: &[u8]) -> NetworkResult<crate::DhtQueryResult> {
610        let (tx, rx) = oneshot::channel();
611        self.cmd_tx
612            .send(HostCommand::FindProviders(key.to_vec(), tx))
613            .await
614            .map_err(|_| NetworkError::NotRunning)?;
615        match tokio::time::timeout(Duration::from_secs(30), rx).await {
616            Ok(Ok(result)) => Ok(result),
617            Ok(Err(_)) => Err(NetworkError::NotRunning),
618            Err(_) => Ok(crate::DhtQueryResult::failure(
619                "providers",
620                "query timeout".into(),
621            )),
622        }
623    }
624
625    /// Find a peer in the DHT.
626    pub async fn find_peer(&self, peer: &str) -> NetworkResult<crate::DhtQueryResult> {
627        let peer_id = parse_peer_id(peer)?;
628        let (tx, rx) = oneshot::channel();
629        self.cmd_tx
630            .send(HostCommand::FindPeer(peer_id, tx))
631            .await
632            .map_err(|_| NetworkError::NotRunning)?;
633        match tokio::time::timeout(Duration::from_secs(30), rx).await {
634            Ok(Ok(result)) => Ok(result),
635            Ok(Err(_)) => Err(NetworkError::NotRunning),
636            Err(_) => Ok(crate::DhtQueryResult::failure(
637                "find_peer",
638                "query timeout".into(),
639            )),
640        }
641    }
642
643    /// Announce content to the DHT.
644    pub async fn provide(&self, key: &[u8]) -> NetworkResult<crate::DhtQueryResult> {
645        let (tx, rx) = oneshot::channel();
646        self.cmd_tx
647            .send(HostCommand::Provide(key.to_vec(), tx))
648            .await
649            .map_err(|_| NetworkError::NotRunning)?;
650        match tokio::time::timeout(Duration::from_secs(30), rx).await {
651            Ok(Ok(result)) => Ok(result),
652            Ok(Err(_)) => Err(NetworkError::NotRunning),
653            Err(_) => Ok(crate::DhtQueryResult::failure(
654                "provide",
655                "query timeout".into(),
656            )),
657        }
658    }
659
660    /// Put a value in the DHT.
661    pub async fn put_value(
662        &self,
663        key: &[u8],
664        value: Vec<u8>,
665    ) -> NetworkResult<crate::DhtQueryResult> {
666        let (tx, rx) = oneshot::channel();
667        self.cmd_tx
668            .send(HostCommand::PutValue(key.to_vec(), value, tx))
669            .await
670            .map_err(|_| NetworkError::NotRunning)?;
671        match tokio::time::timeout(Duration::from_secs(30), rx).await {
672            Ok(Ok(result)) => Ok(result),
673            Ok(Err(_)) => Err(NetworkError::NotRunning),
674            Err(_) => Ok(crate::DhtQueryResult::failure(
675                "put_value",
676                "query timeout".into(),
677            )),
678        }
679    }
680
681    /// Get a value from the DHT.
682    pub async fn get_value(&self, key: &[u8]) -> NetworkResult<crate::DhtQueryResult> {
683        let (tx, rx) = oneshot::channel();
684        self.cmd_tx
685            .send(HostCommand::GetValue(key.to_vec(), tx))
686            .await
687            .map_err(|_| NetworkError::NotRunning)?;
688        match tokio::time::timeout(Duration::from_secs(30), rx).await {
689            Ok(Ok(result)) => Ok(result),
690            Ok(Err(_)) => Err(NetworkError::NotRunning),
691            Err(_) => Ok(crate::DhtQueryResult::failure(
692                "get_value",
693                "query timeout".into(),
694            )),
695        }
696    }
697
698    /// Get DHT statistics.
699    pub async fn dht_stats(&self) -> NetworkResult<crate::DhtStats> {
700        let (tx, rx) = oneshot::channel();
701        self.cmd_tx
702            .send(HostCommand::GetDhtStats(tx))
703            .await
704            .map_err(|_| NetworkError::NotRunning)?;
705        rx.await.map_err(|_| NetworkError::NotRunning)
706    }
707
708    /// Bootstrap the DHT.
709    pub async fn bootstrap(&self) -> NetworkResult<()> {
710        let (tx, rx) = oneshot::channel();
711        self.cmd_tx
712            .send(HostCommand::Bootstrap(tx))
713            .await
714            .map_err(|_| NetworkError::NotRunning)?;
715        rx.await
716            .map_err(|_| NetworkError::NotRunning)?
717            .map_err(NetworkError::Swarm)
718    }
719}
720
721use libp2p::{identify, kad, mdns, ping};