1use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use futures::StreamExt;
15use libp2p::{identity::Keypair, swarm::SwarmEvent, Multiaddr, PeerId, Swarm};
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use crate::{
19 behavior::FerripfsBehavior,
20 swarm::{parse_multiaddr, parse_peer_id, SwarmBuilder, SwarmConfig},
21 ConnectedPeer, NetworkError, NetworkResult, PeerInfo, PingResult,
22};
23
24#[derive(Debug, Clone)]
26pub struct HostConfig {
27 pub keypair: Keypair,
29 pub listen_addrs: Vec<String>,
31 pub bootstrap_peers: Vec<String>,
33 pub swarm_config: SwarmConfig,
35}
36
37impl HostConfig {
38 pub fn from_config(config: &ferripfs_config::Config, keypair: Keypair) -> Self {
40 Self {
41 keypair,
42 listen_addrs: config.addresses.swarm.clone(),
43 bootstrap_peers: config.bootstrap.clone(),
44 swarm_config: SwarmConfig::from_config(config),
45 }
46 }
47}
48
49#[derive(Debug)]
51pub enum HostCommand {
52 GetPeerInfo(oneshot::Sender<PeerInfo>),
54 GetConnectedPeers(oneshot::Sender<Vec<ConnectedPeer>>),
56 Connect(Multiaddr, oneshot::Sender<Result<(), String>>),
58 Disconnect(PeerId, oneshot::Sender<Result<(), String>>),
60 GetKnownAddresses(oneshot::Sender<HashMap<String, Vec<String>>>),
62 Ping(PeerId, oneshot::Sender<PingResult>),
64 GetListenAddresses(oneshot::Sender<Vec<String>>),
66 Shutdown(oneshot::Sender<()>),
68 FindProviders(Vec<u8>, oneshot::Sender<crate::DhtQueryResult>),
70 FindPeer(PeerId, oneshot::Sender<crate::DhtQueryResult>),
72 Provide(Vec<u8>, oneshot::Sender<crate::DhtQueryResult>),
74 PutValue(Vec<u8>, Vec<u8>, oneshot::Sender<crate::DhtQueryResult>),
76 GetValue(Vec<u8>, oneshot::Sender<crate::DhtQueryResult>),
78 GetDhtStats(oneshot::Sender<crate::DhtStats>),
80 Bootstrap(oneshot::Sender<Result<(), String>>),
82}
83
84pub struct NetworkHost {
86 peer_id: PeerId,
88 cmd_tx: mpsc::Sender<HostCommand>,
90 #[allow(dead_code)]
92 listen_addrs: Arc<RwLock<Vec<Multiaddr>>>,
93}
94
95impl NetworkHost {
96 pub async fn start(config: HostConfig) -> NetworkResult<Self> {
98 let local_peer_id = PeerId::from(config.keypair.public());
99 let _local_public_key = config.keypair.public();
100
101 let listen_addrs: Vec<Multiaddr> = config
103 .listen_addrs
104 .iter()
105 .filter_map(|s| parse_multiaddr(s).ok())
106 .collect();
107
108 let mut swarm = SwarmBuilder::new(config.keypair.clone())
110 .with_config(config.swarm_config)
111 .with_listen_addrs(listen_addrs.clone())
112 .build()?;
113
114 for addr in &listen_addrs {
116 swarm
117 .listen_on(addr.clone())
118 .map_err(|e| NetworkError::Transport(e.to_string()))?;
119 }
120
121 for peer_str in &config.bootstrap_peers {
123 if let Ok((peer_id, addr)) = crate::swarm::parse_bootstrap_peer(peer_str) {
124 swarm.behaviour_mut().add_address(&peer_id, addr);
125 }
126 }
127
128 let (cmd_tx, cmd_rx) = mpsc::channel(32);
130 let actual_listen_addrs = Arc::new(RwLock::new(Vec::new()));
131 let listen_addrs_clone = actual_listen_addrs.clone();
132
133 let keypair = config.keypair.clone();
135 tokio::spawn(async move {
136 Self::run_event_loop(swarm, cmd_rx, listen_addrs_clone, keypair).await;
137 });
138
139 Ok(Self {
140 peer_id: local_peer_id,
141 cmd_tx,
142 listen_addrs: actual_listen_addrs,
143 })
144 }
145
146 async fn run_event_loop(
148 mut swarm: Swarm<FerripfsBehavior>,
149 mut cmd_rx: mpsc::Receiver<HostCommand>,
150 listen_addrs: Arc<RwLock<Vec<Multiaddr>>>,
151 keypair: Keypair,
152 ) {
153 let local_peer_id = PeerId::from(keypair.public());
154 let local_public_key = keypair.public();
155
156 let mut pending_pings: HashMap<PeerId, oneshot::Sender<PingResult>> = HashMap::new();
158
159 let mut pending_find_providers: HashMap<
161 kad::QueryId,
162 oneshot::Sender<crate::DhtQueryResult>,
163 > = HashMap::new();
164 let mut pending_find_peers: HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>> =
165 HashMap::new();
166 let mut pending_provide: HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>> =
167 HashMap::new();
168 let mut pending_put_value: HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>> =
169 HashMap::new();
170 let mut pending_get_value: HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>> =
171 HashMap::new();
172
173 loop {
174 tokio::select! {
175 event = swarm.select_next_some() => {
177 match event {
178 SwarmEvent::NewListenAddr { address, .. } => {
179 let mut addrs = listen_addrs.write().await;
180 addrs.push(address.clone());
181 tracing::info!("Listening on {}", address);
182 }
183 SwarmEvent::Behaviour(event) => {
184 Self::handle_behavior_event(
186 event,
187 &mut pending_pings,
188 &mut pending_find_providers,
189 &mut pending_find_peers,
190 &mut pending_provide,
191 &mut pending_put_value,
192 &mut pending_get_value,
193 );
194 }
195 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
196 tracing::debug!("Connected to {}", peer_id);
197 }
198 SwarmEvent::ConnectionClosed { peer_id, .. } => {
199 tracing::debug!("Disconnected from {}", peer_id);
200 }
201 SwarmEvent::IncomingConnection { .. } => {}
202 SwarmEvent::IncomingConnectionError { .. } => {}
203 SwarmEvent::OutgoingConnectionError { .. } => {}
204 SwarmEvent::ExpiredListenAddr { address, .. } => {
205 let mut addrs = listen_addrs.write().await;
206 addrs.retain(|a| a != &address);
207 }
208 SwarmEvent::ListenerClosed { .. } => {}
209 SwarmEvent::ListenerError { .. } => {}
210 SwarmEvent::Dialing { .. } => {}
211 SwarmEvent::NewExternalAddrCandidate { .. } => {}
212 SwarmEvent::ExternalAddrConfirmed { .. } => {}
213 SwarmEvent::ExternalAddrExpired { .. } => {}
214 SwarmEvent::NewExternalAddrOfPeer { .. } => {}
215 _ => {}
216 }
217 }
218 Some(cmd) = cmd_rx.recv() => {
220 match cmd {
221 HostCommand::GetPeerInfo(tx) => {
222 let addrs = listen_addrs.read().await;
223 let info = PeerInfo {
224 id: local_peer_id.to_string(),
225 public_key: bs58::encode(local_public_key.encode_protobuf()).into_string(),
226 addresses: addrs.iter().map(|a| {
227 format!("{}/p2p/{}", a, local_peer_id)
228 }).collect(),
229 agent_version: crate::AGENT_VERSION.to_string(),
230 protocol_version: crate::PROTOCOL_VERSION.to_string(),
231 protocols: vec![
232 "/ipfs/id/1.0.0".to_string(),
233 "/ipfs/ping/1.0.0".to_string(),
234 "/ipfs/kad/1.0.0".to_string(),
235 ],
236 };
237 let _ = tx.send(info);
238 }
239 HostCommand::GetConnectedPeers(tx) => {
240 let peers: Vec<ConnectedPeer> = swarm
241 .connected_peers()
242 .map(|peer_id| {
243 let _info = swarm.network_info();
244 ConnectedPeer {
245 addr: String::new(), peer: peer_id.to_string(),
247 latency: None,
248 muxer: Some("yamux".into()),
249 direction: "unknown".into(),
250 }
251 })
252 .collect();
253 let _ = tx.send(peers);
254 }
255 HostCommand::Connect(addr, tx) => {
256 match swarm.dial(addr) {
257 Ok(_) => { let _ = tx.send(Ok(())); }
258 Err(e) => { let _ = tx.send(Err(e.to_string())); }
259 }
260 }
261 HostCommand::Disconnect(peer_id, tx) => {
262 let _ = swarm.disconnect_peer_id(peer_id);
263 let _ = tx.send(Ok(()));
264 }
265 HostCommand::GetKnownAddresses(tx) => {
266 let mut addrs: HashMap<String, Vec<String>> = HashMap::new();
267 for bucket in swarm.behaviour_mut().kademlia.kbuckets() {
269 for entry in bucket.iter() {
270 let peer_id = entry.node.key.preimage().to_string();
271 let peer_addrs: Vec<String> = entry.node.value
272 .iter()
273 .map(|a| a.to_string())
274 .collect();
275 addrs.insert(peer_id, peer_addrs);
276 }
277 }
278 let _ = tx.send(addrs);
279 }
280 HostCommand::Ping(peer_id, tx) => {
281 pending_pings.insert(peer_id, tx);
282 }
284 HostCommand::GetListenAddresses(tx) => {
285 let addrs = listen_addrs.read().await;
286 let _ = tx.send(addrs.iter().map(|a| a.to_string()).collect());
287 }
288 HostCommand::Shutdown(tx) => {
289 let _ = tx.send(());
290 break;
291 }
292 HostCommand::FindProviders(key, tx) => {
293 let record_key = kad::RecordKey::new(&key);
294 let query_id = swarm.behaviour_mut().get_providers(record_key);
295 pending_find_providers.insert(query_id, tx);
296 }
297 HostCommand::FindPeer(peer_id, tx) => {
298 let query_id = swarm.behaviour_mut().get_closest_peers(peer_id);
299 pending_find_peers.insert(query_id, tx);
300 }
301 HostCommand::Provide(key, tx) => {
302 let record_key = kad::RecordKey::new(&key);
303 match swarm.behaviour_mut().start_providing(record_key) {
304 Ok(query_id) => {
305 pending_provide.insert(query_id, tx);
306 }
307 Err(e) => {
308 let _ = tx.send(crate::DhtQueryResult::failure(
309 "provide",
310 format!("failed to start providing: {}", e),
311 ));
312 }
313 }
314 }
315 HostCommand::PutValue(key, value, tx) => {
316 let record_key = kad::RecordKey::new(&key);
317 let query_id = swarm.behaviour_mut().put_value(record_key, value);
318 pending_put_value.insert(query_id, tx);
319 }
320 HostCommand::GetValue(key, tx) => {
321 let record_key = kad::RecordKey::new(&key);
322 let query_id = swarm.behaviour_mut().get_value(record_key);
323 pending_get_value.insert(query_id, tx);
324 }
325 HostCommand::GetDhtStats(tx) => {
326 let (buckets, total_peers) = swarm.behaviour_mut().get_routing_table_info();
327 let stats = crate::DhtStats {
328 name: "kademlia".to_string(),
329 buckets: buckets as u32,
330 total_peers: total_peers as u32,
331 mode: "client".to_string(), };
333 let _ = tx.send(stats);
334 }
335 HostCommand::Bootstrap(tx) => {
336 match swarm.behaviour_mut().bootstrap() {
337 Ok(_) => { let _ = tx.send(Ok(())); }
338 Err(e) => { let _ = tx.send(Err(format!("bootstrap failed: {:?}", e))); }
339 }
340 }
341 }
342 }
343 }
344 }
345 }
346
347 fn handle_behavior_event(
349 event: crate::behavior::FerripfsBehaviorEvent,
350 pending_pings: &mut HashMap<PeerId, oneshot::Sender<PingResult>>,
351 pending_find_providers: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
352 pending_find_peers: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
353 pending_provide: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
354 pending_put_value: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
355 pending_get_value: &mut HashMap<kad::QueryId, oneshot::Sender<crate::DhtQueryResult>>,
356 ) {
357 use crate::behavior::FerripfsBehaviorEvent;
358
359 match event {
360 FerripfsBehaviorEvent::Ping(ping::Event { peer, result, .. }) => {
361 if let Some(tx) = pending_pings.remove(&peer) {
362 let ping_result = match result {
363 Ok(rtt) => PingResult {
364 success: true,
365 time: rtt.as_millis() as u64,
366 text: format!("Pong received: time={}ms", rtt.as_millis()),
367 },
368 Err(e) => PingResult {
369 success: false,
370 time: 0,
371 text: format!("Ping failed: {}", e),
372 },
373 };
374 let _ = tx.send(ping_result);
375 }
376 }
377 FerripfsBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
378 for (peer_id, addr) in peers {
379 tracing::debug!("mDNS discovered {} at {}", peer_id, addr);
380 }
381 }
382 FerripfsBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
383 for (peer_id, addr) in peers {
384 tracing::debug!("mDNS peer expired {} at {}", peer_id, addr);
385 }
386 }
387 FerripfsBehaviorEvent::Identify(identify::Event::Received {
388 peer_id, info, ..
389 }) => {
390 tracing::debug!("Identified peer {} running {}", peer_id, info.agent_version);
391 }
392 FerripfsBehaviorEvent::Kademlia(kad::Event::RoutingUpdated { peer, .. }) => {
393 tracing::debug!("Kademlia routing updated for {}", peer);
394 }
395 FerripfsBehaviorEvent::Kademlia(kad::Event::OutboundQueryProgressed {
396 id,
397 result,
398 ..
399 }) => match result {
400 kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders {
401 providers,
402 ..
403 })) => {
404 if let Some(tx) = pending_find_providers.remove(&id) {
405 let dht_providers: Vec<crate::DhtProvider> = providers
406 .into_iter()
407 .map(|p| crate::DhtProvider {
408 id: p.to_string(),
409 addrs: vec![],
410 })
411 .collect();
412 let _ = tx.send(crate::DhtQueryResult::success("providers", dht_providers));
413 }
414 }
415 kad::QueryResult::GetProviders(Ok(
416 kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
417 )) => {
418 if let Some(tx) = pending_find_providers.remove(&id) {
419 let _ = tx.send(crate::DhtQueryResult::success("providers", vec![]));
420 }
421 }
422 kad::QueryResult::GetProviders(Err(e)) => {
423 if let Some(tx) = pending_find_providers.remove(&id) {
424 let _ = tx.send(crate::DhtQueryResult::failure(
425 "providers",
426 format!("{:?}", e),
427 ));
428 }
429 }
430 kad::QueryResult::GetClosestPeers(Ok(kad::GetClosestPeersOk { peers, .. })) => {
431 if let Some(tx) = pending_find_peers.remove(&id) {
432 let dht_peers: Vec<crate::DhtProvider> = peers
433 .into_iter()
434 .map(|p| crate::DhtProvider {
435 id: p.peer_id.to_string(),
436 addrs: p.addrs.iter().map(|a| a.to_string()).collect(),
437 })
438 .collect();
439 let _ = tx.send(crate::DhtQueryResult::success("closest_peers", dht_peers));
440 }
441 }
442 kad::QueryResult::GetClosestPeers(Err(e)) => {
443 if let Some(tx) = pending_find_peers.remove(&id) {
444 let _ = tx.send(crate::DhtQueryResult::failure(
445 "closest_peers",
446 format!("{:?}", e),
447 ));
448 }
449 }
450 kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { .. })) => {
451 if let Some(tx) = pending_provide.remove(&id) {
452 let _ = tx.send(crate::DhtQueryResult::success("provide", vec![]));
453 }
454 }
455 kad::QueryResult::StartProviding(Err(e)) => {
456 if let Some(tx) = pending_provide.remove(&id) {
457 let _ = tx.send(crate::DhtQueryResult::failure(
458 "provide",
459 format!("{:?}", e),
460 ));
461 }
462 }
463 kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { .. })) => {
464 if let Some(tx) = pending_put_value.remove(&id) {
465 let _ = tx.send(crate::DhtQueryResult::success("put_value", vec![]));
466 }
467 }
468 kad::QueryResult::PutRecord(Err(e)) => {
469 if let Some(tx) = pending_put_value.remove(&id) {
470 let _ = tx.send(crate::DhtQueryResult::failure(
471 "put_value",
472 format!("{:?}", e),
473 ));
474 }
475 }
476 kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(peer_record))) => {
477 if let Some(tx) = pending_get_value.remove(&id) {
478 let value = String::from_utf8_lossy(&peer_record.record.value).to_string();
479 let _ = tx.send(crate::DhtQueryResult::success_with_extra(
480 "get_value",
481 value,
482 ));
483 }
484 }
485 kad::QueryResult::GetRecord(Ok(
486 kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. },
487 )) => {
488 if let Some(tx) = pending_get_value.remove(&id) {
489 let _ = tx.send(crate::DhtQueryResult::failure(
490 "get_value",
491 "record not found".into(),
492 ));
493 }
494 }
495 kad::QueryResult::GetRecord(Err(e)) => {
496 if let Some(tx) = pending_get_value.remove(&id) {
497 let _ = tx.send(crate::DhtQueryResult::failure(
498 "get_value",
499 format!("{:?}", e),
500 ));
501 }
502 }
503 _ => {}
504 },
505 _ => {}
506 }
507 }
508
509 pub fn peer_id(&self) -> &PeerId {
511 &self.peer_id
512 }
513
514 pub async fn peer_info(&self) -> NetworkResult<PeerInfo> {
516 let (tx, rx) = oneshot::channel();
517 self.cmd_tx
518 .send(HostCommand::GetPeerInfo(tx))
519 .await
520 .map_err(|_| NetworkError::NotRunning)?;
521 rx.await.map_err(|_| NetworkError::NotRunning)
522 }
523
524 pub async fn connected_peers(&self) -> NetworkResult<Vec<ConnectedPeer>> {
526 let (tx, rx) = oneshot::channel();
527 self.cmd_tx
528 .send(HostCommand::GetConnectedPeers(tx))
529 .await
530 .map_err(|_| NetworkError::NotRunning)?;
531 rx.await.map_err(|_| NetworkError::NotRunning)
532 }
533
534 pub async fn connect(&self, addr: &str) -> NetworkResult<()> {
536 let multiaddr = parse_multiaddr(addr)?;
537 let (tx, rx) = oneshot::channel();
538 self.cmd_tx
539 .send(HostCommand::Connect(multiaddr, tx))
540 .await
541 .map_err(|_| NetworkError::NotRunning)?;
542 rx.await
543 .map_err(|_| NetworkError::NotRunning)?
544 .map_err(NetworkError::Connection)
545 }
546
547 pub async fn disconnect(&self, peer: &str) -> NetworkResult<()> {
549 let peer_id = parse_peer_id(peer)?;
550 let (tx, rx) = oneshot::channel();
551 self.cmd_tx
552 .send(HostCommand::Disconnect(peer_id, tx))
553 .await
554 .map_err(|_| NetworkError::NotRunning)?;
555 rx.await
556 .map_err(|_| NetworkError::NotRunning)?
557 .map_err(NetworkError::Connection)
558 }
559
560 pub async fn known_addresses(&self) -> NetworkResult<HashMap<String, Vec<String>>> {
562 let (tx, rx) = oneshot::channel();
563 self.cmd_tx
564 .send(HostCommand::GetKnownAddresses(tx))
565 .await
566 .map_err(|_| NetworkError::NotRunning)?;
567 rx.await.map_err(|_| NetworkError::NotRunning)
568 }
569
570 pub async fn ping(&self, peer: &str) -> NetworkResult<PingResult> {
572 let peer_id = parse_peer_id(peer)?;
573 let (tx, rx) = oneshot::channel();
574 self.cmd_tx
575 .send(HostCommand::Ping(peer_id, tx))
576 .await
577 .map_err(|_| NetworkError::NotRunning)?;
578 match tokio::time::timeout(Duration::from_secs(10), rx).await {
580 Ok(Ok(result)) => Ok(result),
581 Ok(Err(_)) => Err(NetworkError::NotRunning),
582 Err(_) => Ok(PingResult {
583 success: false,
584 time: 0,
585 text: "Ping timeout".into(),
586 }),
587 }
588 }
589
590 pub async fn listen_addresses(&self) -> NetworkResult<Vec<String>> {
592 let (tx, rx) = oneshot::channel();
593 self.cmd_tx
594 .send(HostCommand::GetListenAddresses(tx))
595 .await
596 .map_err(|_| NetworkError::NotRunning)?;
597 rx.await.map_err(|_| NetworkError::NotRunning)
598 }
599
600 pub async fn shutdown(&self) -> NetworkResult<()> {
602 let (tx, rx) = oneshot::channel();
603 let _ = self.cmd_tx.send(HostCommand::Shutdown(tx)).await;
604 let _ = rx.await;
605 Ok(())
606 }
607
608 pub async fn find_providers(&self, key: &[u8]) -> NetworkResult<crate::DhtQueryResult> {
610 let (tx, rx) = oneshot::channel();
611 self.cmd_tx
612 .send(HostCommand::FindProviders(key.to_vec(), tx))
613 .await
614 .map_err(|_| NetworkError::NotRunning)?;
615 match tokio::time::timeout(Duration::from_secs(30), rx).await {
616 Ok(Ok(result)) => Ok(result),
617 Ok(Err(_)) => Err(NetworkError::NotRunning),
618 Err(_) => Ok(crate::DhtQueryResult::failure(
619 "providers",
620 "query timeout".into(),
621 )),
622 }
623 }
624
625 pub async fn find_peer(&self, peer: &str) -> NetworkResult<crate::DhtQueryResult> {
627 let peer_id = parse_peer_id(peer)?;
628 let (tx, rx) = oneshot::channel();
629 self.cmd_tx
630 .send(HostCommand::FindPeer(peer_id, tx))
631 .await
632 .map_err(|_| NetworkError::NotRunning)?;
633 match tokio::time::timeout(Duration::from_secs(30), rx).await {
634 Ok(Ok(result)) => Ok(result),
635 Ok(Err(_)) => Err(NetworkError::NotRunning),
636 Err(_) => Ok(crate::DhtQueryResult::failure(
637 "find_peer",
638 "query timeout".into(),
639 )),
640 }
641 }
642
643 pub async fn provide(&self, key: &[u8]) -> NetworkResult<crate::DhtQueryResult> {
645 let (tx, rx) = oneshot::channel();
646 self.cmd_tx
647 .send(HostCommand::Provide(key.to_vec(), tx))
648 .await
649 .map_err(|_| NetworkError::NotRunning)?;
650 match tokio::time::timeout(Duration::from_secs(30), rx).await {
651 Ok(Ok(result)) => Ok(result),
652 Ok(Err(_)) => Err(NetworkError::NotRunning),
653 Err(_) => Ok(crate::DhtQueryResult::failure(
654 "provide",
655 "query timeout".into(),
656 )),
657 }
658 }
659
660 pub async fn put_value(
662 &self,
663 key: &[u8],
664 value: Vec<u8>,
665 ) -> NetworkResult<crate::DhtQueryResult> {
666 let (tx, rx) = oneshot::channel();
667 self.cmd_tx
668 .send(HostCommand::PutValue(key.to_vec(), value, tx))
669 .await
670 .map_err(|_| NetworkError::NotRunning)?;
671 match tokio::time::timeout(Duration::from_secs(30), rx).await {
672 Ok(Ok(result)) => Ok(result),
673 Ok(Err(_)) => Err(NetworkError::NotRunning),
674 Err(_) => Ok(crate::DhtQueryResult::failure(
675 "put_value",
676 "query timeout".into(),
677 )),
678 }
679 }
680
681 pub async fn get_value(&self, key: &[u8]) -> NetworkResult<crate::DhtQueryResult> {
683 let (tx, rx) = oneshot::channel();
684 self.cmd_tx
685 .send(HostCommand::GetValue(key.to_vec(), tx))
686 .await
687 .map_err(|_| NetworkError::NotRunning)?;
688 match tokio::time::timeout(Duration::from_secs(30), rx).await {
689 Ok(Ok(result)) => Ok(result),
690 Ok(Err(_)) => Err(NetworkError::NotRunning),
691 Err(_) => Ok(crate::DhtQueryResult::failure(
692 "get_value",
693 "query timeout".into(),
694 )),
695 }
696 }
697
698 pub async fn dht_stats(&self) -> NetworkResult<crate::DhtStats> {
700 let (tx, rx) = oneshot::channel();
701 self.cmd_tx
702 .send(HostCommand::GetDhtStats(tx))
703 .await
704 .map_err(|_| NetworkError::NotRunning)?;
705 rx.await.map_err(|_| NetworkError::NotRunning)
706 }
707
708 pub async fn bootstrap(&self) -> NetworkResult<()> {
710 let (tx, rx) = oneshot::channel();
711 self.cmd_tx
712 .send(HostCommand::Bootstrap(tx))
713 .await
714 .map_err(|_| NetworkError::NotRunning)?;
715 rx.await
716 .map_err(|_| NetworkError::NotRunning)?
717 .map_err(NetworkError::Swarm)
718 }
719}
720
721use libp2p::{identify, kad, mdns, ping};