1use anyhow::{anyhow, format_err};
2use either::Either;
3use futures::{
4 channel::{
5 mpsc::{unbounded, Receiver, UnboundedSender},
6 oneshot,
7 },
8 stream::Fuse,
9 FutureExt, StreamExt,
10};
11
12use crate::{p2p::MultiaddrExt, Channel, InnerPubsubEvent};
13use crate::{ConnectionEvents, PeerConnectionEvents, TSwarmEvent};
14
15use std::{
16 collections::{hash_map::Entry, HashMap, HashSet},
17 time::Duration,
18};
19
20use crate::{config::BOOTSTRAP_NODES, IpfsEvent, TSwarmEventFn};
21use futures_timer::Delay;
22use ipld_core::cid::Cid;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::task::{Context, Poll};
26
27use crate::{
28 p2p::TSwarm,
29 repo::{Repo, RepoEvent},
30 AddPeerOpt,
31};
32
33pub use crate::{p2p::BehaviourEvent, p2p::KadResult};
34
35pub use libp2p::{self, core::transport::ListenerId, swarm::NetworkBehaviour, Multiaddr, PeerId};
36use multibase::Base;
37
38use libp2p::core::{ConnectedPoint, Endpoint};
39#[cfg(not(target_arch = "wasm32"))]
40use libp2p::mdns::Event as MdnsEvent;
41use libp2p::multiaddr::Protocol;
42use libp2p::{
43 autonat,
44 identify::{Event as IdentifyEvent, Info as IdentifyInfo},
45 kad::{
46 AddProviderError, AddProviderOk, BootstrapError, BootstrapOk, Event as KademliaEvent,
47 GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError,
48 GetRecordOk, PutRecordError, PutRecordOk, QueryId, QueryResult::*, Record,
49 },
50 rendezvous::{Cookie, Namespace},
51 swarm::{ConnectionId, SwarmEvent},
52};
53use tokio::sync::Notify;
54
55#[allow(clippy::type_complexity)]
58#[allow(dead_code)]
59pub(crate) struct IpfsTask<C: NetworkBehaviour<ToSwarm = void::Void>> {
60 pub(crate) swarm: TSwarm<C>,
61 pub(crate) repo_events: Fuse<Receiver<RepoEvent>>,
62 pub(crate) from_facade: Fuse<Receiver<IpfsEvent>>,
63 pub(crate) bitswap_cancellable: HashMap<Cid, Vec<Arc<Notify>>>,
64 pub(crate) listening_addresses: HashMap<ListenerId, Vec<Multiaddr>>,
65 pub(crate) provider_stream: HashMap<QueryId, UnboundedSender<PeerId>>,
66 pub(crate) record_stream: HashMap<QueryId, UnboundedSender<Record>>,
67 pub(crate) repo: Repo,
68 pub(crate) kad_subscriptions: HashMap<QueryId, Channel<KadResult>>,
69 pub(crate) dht_peer_lookup: HashMap<PeerId, Vec<Channel<libp2p::identify::Info>>>,
70 pub(crate) bootstraps: HashSet<Multiaddr>,
71 pub(crate) swarm_event: Option<TSwarmEventFn<C>>,
72 pub(crate) pubsub_event_stream: Vec<UnboundedSender<InnerPubsubEvent>>,
73 pub(crate) timer: TaskTimer,
74 pub(crate) local_external_addr: bool,
75 pub(crate) relay_listener: HashMap<PeerId, Vec<Channel<()>>>,
76 pub(crate) rzv_register_pending: HashMap<(PeerId, Namespace), Vec<Channel<()>>>,
77 pub(crate) rzv_discover_pending:
78 HashMap<(PeerId, Namespace), Vec<Channel<HashMap<PeerId, Vec<Multiaddr>>>>>,
79 pub(crate) rzv_cookie: HashMap<PeerId, Option<Cookie>>,
80
81 pub(crate) peer_connection_events:
82 HashMap<PeerId, Vec<futures::channel::mpsc::Sender<PeerConnectionEvents>>>,
83 pub(crate) connection_events: Vec<futures::channel::mpsc::Sender<ConnectionEvents>>,
84
85 pub(crate) pending_connection: HashMap<ConnectionId, Channel<ConnectionId>>,
86 pub(crate) pending_disconnection: HashMap<PeerId, Vec<Channel<()>>>,
87 pub(crate) pending_add_listener: HashMap<ListenerId, Channel<Multiaddr>>,
88 pub(crate) pending_remove_listener: HashMap<ListenerId, Channel<()>>,
89
90 pub(crate) event_capacity: usize,
91}
92
93impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
94 pub fn new(
95 swarm: TSwarm<C>,
96 repo_events: Fuse<Receiver<RepoEvent>>,
97 from_facade: Fuse<Receiver<IpfsEvent>>,
98 repo: &Repo,
99 event_capacity: usize,
100 ) -> Self {
101 IpfsTask {
102 repo_events,
103 from_facade,
104 swarm,
105 event_capacity,
106 provider_stream: HashMap::new(),
107 record_stream: HashMap::new(),
108 dht_peer_lookup: Default::default(),
109 pubsub_event_stream: Default::default(),
110 kad_subscriptions: Default::default(),
111 bitswap_cancellable: Default::default(),
112 repo: repo.clone(),
113 bootstraps: Default::default(),
114 swarm_event: Default::default(),
115 timer: Default::default(),
116 relay_listener: Default::default(),
117 local_external_addr: false,
118 rzv_register_pending: Default::default(),
119 rzv_discover_pending: Default::default(),
120 rzv_cookie: Default::default(),
121 listening_addresses: HashMap::new(),
122 peer_connection_events: HashMap::new(),
123 connection_events: Vec::new(),
124 pending_disconnection: Default::default(),
125 pending_connection: Default::default(),
126 pending_add_listener: Default::default(),
127 pending_remove_listener: Default::default(),
128 }
129 }
130}
131
132pub(crate) struct TaskTimer {
133 pub(crate) event_cleanup: Delay,
134}
135
136impl Default for TaskTimer {
137 fn default() -> Self {
138 let event_cleanup = Delay::new(Duration::from_secs(60));
139
140 Self { event_cleanup }
141 }
142}
143
144impl<C: NetworkBehaviour<ToSwarm = void::Void>> futures::Future for IpfsTask<C> {
145 type Output = ();
146
147 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
148 loop {
149 match self.swarm.poll_next_unpin(cx) {
150 Poll::Ready(Some(event)) => self.handle_swarm_event(event),
151 Poll::Ready(None) => return Poll::Ready(()),
152 Poll::Pending => break,
153 }
154 }
155 loop {
156 match self.from_facade.poll_next_unpin(cx) {
157 Poll::Ready(Some(event)) => self.handle_event(event),
158 Poll::Ready(None) => return Poll::Ready(()),
159 Poll::Pending => break,
160 }
161 }
162 loop {
163 match self.repo_events.poll_next_unpin(cx) {
164 Poll::Ready(Some(event)) => self.handle_repo_event(event),
165 Poll::Ready(None) => return Poll::Ready(()),
166 Poll::Pending => break,
167 }
168 }
169
170 if self.timer.event_cleanup.poll_unpin(cx).is_ready() {
171 self.pubsub_event_stream.retain(|ch| !ch.is_closed());
172 self.connection_events.retain(|ch| !ch.is_closed());
173 self.peer_connection_events.retain(|_, ch_list| {
174 ch_list.retain(|ch| !ch.is_closed());
175 !ch_list.is_empty()
176 });
177 self.timer.event_cleanup.reset(Duration::from_secs(60));
178 }
179
180 Poll::Pending
181 }
182}
183
184impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
185 pub(crate) async fn run(&mut self) {
186 let mut event_cleanup = futures_timer::Delay::new(Duration::from_secs(60));
187
188 loop {
189 tokio::select! {
190 biased;
191 Some(swarm) = self.swarm.next() => {
192 self.handle_swarm_event(swarm);
193 },
194 Some(repo) = self.repo_events.next() => {
195 self.handle_repo_event(repo);
196 },
197 Some(event) = self.from_facade.next() => {
198 if matches!(event, IpfsEvent::Exit) {
199 break;
200 }
201 self.handle_event(event);
202 },
203 _ = &mut event_cleanup => {
204 self.pubsub_event_stream.retain(|ch| !ch.is_closed());
205 self.connection_events.retain(|ch| !ch.is_closed());
206 self.peer_connection_events.retain(|_, ch_list| {
207 ch_list.retain(|ch| !ch.is_closed());
208 !ch_list.is_empty()
209 });
210 event_cleanup.reset(Duration::from_secs(60));
211 }
212 }
213 }
214 }
215
216 fn emit_pubsub_event(&self, event: InnerPubsubEvent) {
217 for ch in &self.pubsub_event_stream {
218 let event = event.clone();
219 let _ = ch.unbounded_send(event);
220 }
221 }
222
223 fn handle_swarm_event(&mut self, swarm_event: TSwarmEvent<C>) {
224 if let Some(handler) = self.swarm_event.as_ref() {
225 handler(&mut self.swarm, &swarm_event)
226 }
227 match swarm_event {
228 SwarmEvent::NewListenAddr {
229 listener_id,
230 address,
231 } => {
232 if self.local_external_addr
233 && !address.is_relay()
234 && (address.is_loopback() || address.is_private())
235 {
236 self.swarm.add_external_address(address.clone());
237 }
238
239 if !address.is_loopback() && !address.is_private() {
240 self.swarm.add_external_address(address.clone());
242 }
243
244 self.listening_addresses
245 .entry(listener_id)
246 .or_default()
247 .push(address.clone());
248
249 if let Some(ret) = self.pending_add_listener.remove(&listener_id) {
250 let _ = ret.send(Ok(address));
251 }
252 }
253 SwarmEvent::ConnectionEstablished {
254 peer_id,
255 connection_id,
256 endpoint,
257 ..
258 } => {
259 if let Some(ch) = self.pending_connection.remove(&connection_id) {
260 let _ = ch.send(Ok(connection_id));
261 }
262
263 let (ep, mut addr) = match &endpoint {
264 ConnectedPoint::Dialer { address, .. } => (Endpoint::Dialer, address.clone()),
265 ConnectedPoint::Listener { local_addr, .. } if endpoint.is_relayed() => {
266 (Endpoint::Listener, local_addr.clone())
267 }
268 ConnectedPoint::Listener { send_back_addr, .. } => {
269 (Endpoint::Listener, send_back_addr.clone())
270 }
271 };
272
273 if matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
274 addr.pop();
275 }
276
277 if let Some(ch_list) = self.peer_connection_events.get_mut(&peer_id) {
278 let ev = match ep {
279 Endpoint::Dialer => PeerConnectionEvents::OutgoingConnection {
280 connection_id,
281 addr: addr.clone(),
282 },
283 Endpoint::Listener => PeerConnectionEvents::IncomingConnection {
284 connection_id,
285 addr: addr.clone(),
286 },
287 };
288
289 for ch in ch_list {
290 let _ = ch.try_send(ev.clone());
291 }
292 }
293
294 for ch in &mut self.connection_events {
295 let ev = match ep {
296 Endpoint::Dialer => ConnectionEvents::OutgoingConnection {
297 peer_id,
298 connection_id,
299 addr: addr.clone(),
300 },
301 Endpoint::Listener => ConnectionEvents::IncomingConnection {
302 peer_id,
303 connection_id,
304 addr: addr.clone(),
305 },
306 };
307
308 let _ = ch.try_send(ev);
309 }
310 }
311 SwarmEvent::OutgoingConnectionError {
312 connection_id,
313 error,
314 ..
315 } => {
316 if let Some(ch) = self.pending_connection.remove(&connection_id) {
317 let _ = ch.send(Err(anyhow::Error::from(error)));
318 }
319 }
320 SwarmEvent::ConnectionClosed {
321 peer_id,
322 connection_id,
323 ..
324 } => {
325 if let Some(ch) = self.pending_disconnection.remove(&peer_id) {
326 for ch in ch {
327 let _ = ch.send(Ok(()));
328 }
329 }
330
331 if let Some(ch_list) = self.peer_connection_events.get_mut(&peer_id) {
332 for ch in ch_list {
333 let _ =
334 ch.try_send(PeerConnectionEvents::ClosedConnection { connection_id });
335 }
336 }
337
338 for ch in &mut self.connection_events {
339 let _ = ch.try_send(ConnectionEvents::ClosedConnection {
340 peer_id,
341 connection_id,
342 });
343 }
344 }
345 SwarmEvent::ExpiredListenAddr {
346 listener_id,
347 address,
348 } => {
349 if let Some(list) = self.listening_addresses.get_mut(&listener_id) {
350 list.retain(|addr| &address != addr);
351 }
352
353 self.swarm.remove_external_address(&address);
354 }
355 SwarmEvent::ListenerClosed {
356 listener_id,
357 reason,
358 addresses,
359 } => {
360 for address in addresses {
361 self.listening_addresses.remove(&listener_id);
362 self.swarm.remove_external_address(&address);
363 }
364
365 if let Some(ret) = self.pending_remove_listener.remove(&listener_id) {
366 let _ = ret.send(reason.map_err(anyhow::Error::from));
367 }
368 }
369 SwarmEvent::ListenerError { listener_id, error } => {
370 if let Some(ret) = self.pending_add_listener.remove(&listener_id) {
371 let _ = ret.send(Err(error.into()));
372 }
373 }
374 #[cfg(not(target_arch = "wasm32"))]
375 SwarmEvent::Behaviour(BehaviourEvent::Mdns(event)) => match event {
376 MdnsEvent::Discovered(list) => {
377 for (peer, addr) in list {
378 self.swarm.behaviour_mut().add_peer((peer, addr));
379 }
380 }
381 MdnsEvent::Expired(list) => {
382 for (peer, _) in list {
383 if let Some(mdns) = self.swarm.behaviour().mdns.as_ref() {
384 if !mdns.discovered_nodes().any(|p| p == &peer) {
385 trace!("mdns: Expired peer {}", peer.to_base58());
386 }
387 }
388 }
389 }
390 },
391 SwarmEvent::Behaviour(BehaviourEvent::Kademlia(event)) => {
392 match event {
393 KademliaEvent::InboundRequest { request } => {
394 trace!("kad: inbound {:?} request handled", request);
395 }
396 KademliaEvent::OutboundQueryProgressed {
397 result, id, step, ..
398 } => {
399 if self
402 .swarm
403 .behaviour()
404 .kademlia
405 .as_ref()
406 .and_then(|kad| kad.query(&id))
407 .is_none()
408 {
409 match result {
410 GetClosestPeers(_) | GetProviders(_) | GetRecord(_) => {}
412 Bootstrap(Err(_)) | StartProviding(Err(_)) | PutRecord(Err(_)) => {}
414 _ => {
416 if let Some(ret) = self.kad_subscriptions.remove(&id) {
417 let _ = ret.send(Ok(KadResult::Complete));
418 }
419 }
420 }
421 }
422
423 match result {
424 Bootstrap(Ok(BootstrapOk {
425 peer,
426 num_remaining,
427 })) => {
428 debug!(
429 "kad: bootstrapped with {}, {} peers remain",
430 peer, num_remaining
431 );
432 }
433 Bootstrap(Err(BootstrapError::Timeout { .. })) => {
434 warn!("kad: timed out while trying to bootstrap");
435
436 if let Some(ret) = self.kad_subscriptions.remove(&id) {
437 let _ = ret.send(Err(anyhow::anyhow!(
438 "kad: timed out while trying to bootstrap"
439 )));
440 }
441 }
442 GetClosestPeers(Ok(GetClosestPeersOk { key, peers })) => {
443 if let Some(ret) = self.kad_subscriptions.remove(&id) {
444 let _ = ret.send(Ok(KadResult::Peers(
445 peers.iter().map(|info| info.peer_id).collect(),
446 )));
447 }
448 if let Ok(peer_id) = PeerId::from_bytes(&key) {
449 if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
450 if !peers.iter().any(|info| info.peer_id == peer_id) {
451 for ret in rets {
452 let _ = ret.send(Err(anyhow::anyhow!(
453 "Could not locate peer"
454 )));
455 }
456 }
457 }
458 }
459 }
460 GetClosestPeers(Err(GetClosestPeersError::Timeout {
461 key,
462 peers: _,
463 })) => {
464 warn!("kad: timed out while trying to find all closest peers");
466
467 if let Some(ret) = self.kad_subscriptions.remove(&id) {
468 let _ = ret.send(Err(anyhow::anyhow!(
469 "timed out while trying to find all closest peers"
470 )));
471 }
472 if let Ok(peer_id) = PeerId::from_bytes(&key) {
473 if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
474 for ret in rets {
475 let _ = ret.send(Err(anyhow::anyhow!(
476 "timed out while trying to find all closest peers"
477 )));
478 }
479 }
480 }
481 }
482 GetProviders(Ok(GetProvidersOk::FoundProviders {
483 key: _,
484 providers,
485 })) => {
486 if let Entry::Occupied(entry) = self.provider_stream.entry(id) {
487 let tx = entry.get();
488 for provider in providers {
489 let _ = tx.unbounded_send(provider);
490 }
491 }
492 }
493 GetProviders(Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
494 ..
495 })) => {
496 if step.last {
497 if let Some(tx) = self.provider_stream.remove(&id) {
498 tx.close_channel();
499 }
500 }
501 }
502 GetProviders(Err(GetProvidersError::Timeout { key, .. })) => {
503 let key = multibase::encode(Base::Base32Lower, key);
504 warn!("kad: timed out while trying to get providers for {}", key);
505
506 if let Some(ret) = self.kad_subscriptions.remove(&id) {
507 let _ = ret.send(Err(anyhow::anyhow!(
508 "timed out while trying to get providers for the given key"
509 )));
510 }
511 }
512 StartProviding(Ok(AddProviderOk { key })) => {
513 let key = multibase::encode(Base::Base32Lower, key);
514 debug!("kad: providing {}", key);
515 }
516 StartProviding(Err(AddProviderError::Timeout { key })) => {
517 let key = multibase::encode(Base::Base32Lower, key);
518 warn!("kad: timed out while trying to provide {}", key);
519
520 if let Some(ret) = self.kad_subscriptions.remove(&id) {
521 let _ = ret.send(Err(anyhow::anyhow!(
522 "kad: timed out while trying to provide the record"
523 )));
524 }
525 }
526 RepublishProvider(Ok(AddProviderOk { key })) => {
527 let key = multibase::encode(Base::Base32Lower, key);
528 debug!("kad: republished provider {}", key);
529 }
530 RepublishProvider(Err(AddProviderError::Timeout { key })) => {
531 let key = multibase::encode(Base::Base32Lower, key);
532 warn!("kad: timed out while trying to republish provider {}", key);
533 }
534 GetRecord(Ok(GetRecordOk::FoundRecord(record))) => {
535 if let Entry::Occupied(entry) = self.record_stream.entry(id) {
536 let _ = entry.get().unbounded_send(record.record);
537 }
538 }
539 GetRecord(Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
540 ..
541 })) => {
542 if step.last {
543 if let Some(tx) = self.record_stream.remove(&id) {
544 tx.close_channel();
545 }
546 }
547 }
548 GetRecord(Err(GetRecordError::NotFound {
549 key,
550 closest_peers: _,
551 })) => {
552 let key = multibase::encode(Base::Base32Lower, key);
553 warn!("kad: couldn't find record {}", key);
554
555 if let Some(tx) = self.record_stream.remove(&id) {
556 tx.close_channel();
557 }
558 }
559 GetRecord(Err(GetRecordError::QuorumFailed {
560 key,
561 records: _,
562 quorum,
563 })) => {
564 let key = multibase::encode(Base::Base32Lower, key);
565 warn!(
566 "kad: quorum failed {} when trying to get key {}",
567 quorum, key
568 );
569
570 if let Some(tx) = self.record_stream.remove(&id) {
571 tx.close_channel();
572 }
573 }
574 GetRecord(Err(GetRecordError::Timeout { key })) => {
575 let key = multibase::encode(Base::Base32Lower, key);
576 warn!("kad: timed out while trying to get key {}", key);
577
578 if let Some(tx) = self.record_stream.remove(&id) {
579 tx.close_channel();
580 }
581 }
582 PutRecord(Ok(PutRecordOk { key }))
583 | RepublishRecord(Ok(PutRecordOk { key })) => {
584 let key = multibase::encode(Base::Base32Lower, key);
585 debug!("kad: successfully put record {}", key);
586 }
587 PutRecord(Err(PutRecordError::QuorumFailed {
588 key,
589 success: _,
590 quorum,
591 }))
592 | RepublishRecord(Err(PutRecordError::QuorumFailed {
593 key,
594 success: _,
595 quorum,
596 })) => {
597 let key = multibase::encode(Base::Base32Lower, key);
598 warn!(
599 "kad: quorum failed ({}) when trying to put record {}",
600 quorum, key
601 );
602
603 if let Some(ret) = self.kad_subscriptions.remove(&id) {
604 let _ = ret.send(Err(anyhow::anyhow!(
605 "kad: quorum failed when trying to put the record"
606 )));
607 }
608 }
609 PutRecord(Err(PutRecordError::Timeout {
610 key,
611 success: _,
612 quorum: _,
613 })) => {
614 let key = multibase::encode(Base::Base32Lower, key);
615 warn!("kad: timed out while trying to put record {}", key);
616
617 if let Some(ret) = self.kad_subscriptions.remove(&id) {
618 let _ = ret.send(Err(anyhow::anyhow!(
619 "kad: timed out while trying to put record {}",
620 key
621 )));
622 }
623 }
624 RepublishRecord(Err(PutRecordError::Timeout {
625 key,
626 success: _,
627 quorum: _,
628 })) => {
629 let key = multibase::encode(Base::Base32Lower, key);
630 warn!("kad: timed out while trying to republish record {}", key);
631 }
632 }
633 }
634 KademliaEvent::RoutingUpdated {
635 peer,
636 is_new_peer: _,
637 addresses,
638 bucket_range: _,
639 old_peer: _,
640 } => {
641 trace!("kad: routing updated; {}: {:?}", peer, addresses);
642 }
643 KademliaEvent::UnroutablePeer { peer } => {
644 trace!("kad: peer {} is unroutable", peer);
645 }
646 KademliaEvent::RoutablePeer { peer, address } => {
647 trace!("kad: peer {} ({}) is routable", peer, address);
648 }
649 KademliaEvent::PendingRoutablePeer { peer, address } => {
650 trace!("kad: pending routable peer {} ({})", peer, address);
651 }
652 KademliaEvent::ModeChanged { new_mode } => {
653 let _ = new_mode;
654 }
655 }
656 }
657 SwarmEvent::Behaviour(BehaviourEvent::Pubsub(
658 libp2p::gossipsub::Event::Subscribed { peer_id, topic },
659 )) => self.emit_pubsub_event(InnerPubsubEvent::Subscribe {
660 topic: topic.to_string(),
661 peer_id,
662 }),
663 SwarmEvent::Behaviour(BehaviourEvent::Pubsub(
664 libp2p::gossipsub::Event::Unsubscribed { peer_id, topic },
665 )) => self.emit_pubsub_event(InnerPubsubEvent::Unsubscribe {
666 topic: topic.to_string(),
667 peer_id,
668 }),
669 SwarmEvent::Behaviour(BehaviourEvent::Ping(event)) => match event {
670 libp2p::ping::Event {
671 peer,
672 connection,
673 result: Result::Ok(rtt),
674 } => {
675 trace!(
676 "ping: rtt to {} is {} ms",
677 peer.to_base58(),
678 rtt.as_millis()
679 );
680 self.swarm.behaviour_mut().peerbook.set_peer_rtt(peer, rtt);
681
682 if let Some(m) = self.swarm.behaviour_mut().relay_manager.as_mut() {
683 m.set_peer_rtt(peer, connection, rtt)
684 }
685 }
686 libp2p::ping::Event { .. } => {
687 }
689 },
690 SwarmEvent::Behaviour(BehaviourEvent::RelayClient(event)) => {
691 debug!("Relay Client Event: {event:?}");
692 if let Some(m) = self.swarm.behaviour_mut().relay_manager.as_mut() {
693 m.process_relay_event(event);
694 }
695 }
696 SwarmEvent::Behaviour(BehaviourEvent::RelayManager(event)) => {
697 debug!("Relay Manager Event: {event:?}");
698 match event {
699 libp2p_relay_manager::Event::ReservationSuccessful { peer_id, .. } => {
700 if let Some(chs) = self.relay_listener.remove(&peer_id) {
701 for ch in chs {
702 let _ = ch.send(Ok(()));
703 }
704 }
705 }
706 libp2p_relay_manager::Event::ReservationClosed { peer_id, result } => {
707 if let Some(chs) = self.relay_listener.remove(&peer_id) {
708 match result {
709 Ok(()) => {
710 for ch in chs {
711 let _ = ch.send(Ok(()));
712 }
713 }
714 Err(e) => {
715 let e = e.to_string();
716 for ch in chs {
717 let _ = ch.send(Err(anyhow::anyhow!("{}", e.clone())));
718 }
719 }
720 }
721 }
722 }
723 libp2p_relay_manager::Event::ReservationFailure {
724 peer_id,
725 result: err,
726 } => {
727 if let Some(chs) = self.relay_listener.remove(&peer_id) {
728 let e = err.to_string();
729 for ch in chs {
730 let _ = ch.send(Err(anyhow::anyhow!("{}", e.clone())));
731 }
732 }
733 }
734 _ => {}
735 }
736 }
737
738 SwarmEvent::Behaviour(BehaviourEvent::Identify(event)) => match event {
739 IdentifyEvent::Received { peer_id, info, .. } => {
740 let IdentifyInfo {
741 listen_addrs,
742 protocols,
743 ..
744 } = &info;
745
746 if let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() {
747 if protocols.iter().any(|p| libp2p::kad::PROTOCOL_NAME.eq(p)) {
748 for addr in listen_addrs {
749 kad.add_address(&peer_id, addr.clone());
750 }
751 }
752 }
753
754 if protocols
755 .iter()
756 .any(|p| libp2p::autonat::DEFAULT_PROTOCOL_NAME.eq(p))
757 {
758 if let Some(autonat) = self.swarm.behaviour_mut().autonat.as_mut() {
759 for addr in listen_addrs {
760 autonat.add_server(peer_id, Some(addr.clone()));
761 }
762 }
763 }
764
765 if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
766 for ret in rets {
767 let _ = ret.send(Ok(info.clone()));
768 }
769 }
770
771 self.swarm.behaviour_mut().peerbook.inject_peer_info(info);
772 }
773 event => debug!("identify: {:?}", event),
774 },
775 SwarmEvent::Behaviour(BehaviourEvent::Autonat(autonat::Event::StatusChanged {
776 old,
777 new,
778 })) => {
779 debug!("Old Nat Status: {:?}", old);
781 debug!("New Nat Status: {:?}", new);
782 }
783 SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(
784 libp2p::rendezvous::client::Event::Discovered {
785 rendezvous_node,
786 registrations,
787 cookie,
788 },
789 )) => {
790 self.rzv_cookie.insert(rendezvous_node, Some(cookie));
791 let mut ns_list = HashSet::new();
792 let addrbook = &mut self.swarm.behaviour_mut().addressbook;
793 let mut ns_book: HashMap<Namespace, HashMap<PeerId, Vec<Multiaddr>>> =
794 HashMap::new();
795 for registration in registrations {
796 let namespace = registration.namespace.clone();
797 let peer_id = registration.record.peer_id();
798 let addrs = registration.record.addresses();
799
800 let opts = AddPeerOpt::with_peer_id(peer_id).set_addresses(addrs.to_vec());
803
804 addrbook.add_address(opts);
805
806 ns_book
807 .entry(namespace.clone())
808 .or_default()
809 .entry(peer_id)
810 .or_default()
811 .extend(addrs.to_vec());
812 ns_list.insert(namespace);
813 }
814
815 for ns in ns_list {
816 let map = ns_book.remove(&ns).unwrap_or_default();
817 if let Some(channels) = self.rzv_discover_pending.remove(&(rendezvous_node, ns))
818 {
819 for ch in channels {
820 let _ = ch.send(Ok(map.clone()));
821 }
822 }
823 }
824 }
825 SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(
826 libp2p::rendezvous::client::Event::DiscoverFailed {
827 rendezvous_node,
828 namespace,
829 error,
830 },
831 )) => {
832 let Some(ns) = namespace else {
833 error!("Error registering to {rendezvous_node}: {error:?}");
834 return;
835 };
836
837 error!("Error registering namespace {ns} to {rendezvous_node}: {error:?}");
838
839 if let Some(channels) = self.rzv_discover_pending.remove(&(rendezvous_node, ns)) {
840 for ch in channels {
841 let _ = ch.send(Err(anyhow::anyhow!(
842 "Error discovering peers on {rendezvous_node}: {error:?}"
843 )));
844 }
845 }
846 }
847 SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(
848 libp2p::rendezvous::client::Event::Registered {
849 rendezvous_node,
850 ttl,
851 namespace,
852 },
853 )) => {
854 info!("Registered to {rendezvous_node} under {namespace} for {ttl} secs");
855
856 if let Some(channels) = self
857 .rzv_register_pending
858 .remove(&(rendezvous_node, namespace.clone()))
859 {
860 for ch in channels {
861 let _ = ch.send(Ok(()));
862 }
863 }
864 }
865 SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(
866 libp2p::rendezvous::client::Event::RegisterFailed {
867 rendezvous_node,
868 namespace,
869 error,
870 },
871 )) => {
872 error!("Error registering namespace {namespace} to {rendezvous_node}: {error:?}");
873
874 if let Some(channels) = self
875 .rzv_register_pending
876 .remove(&(rendezvous_node, namespace.clone()))
877 {
878 for ch in channels {
879 let _ = ch.send(Err(anyhow::anyhow!("Error registering namespace {namespace} to {rendezvous_node}: {error:?}")));
880 }
881 }
882 }
883 SwarmEvent::Behaviour(BehaviourEvent::Bitswap(event)) => match event {
884 crate::p2p::bitswap::Event::NeedBlock { cid } => {
885 if let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() {
886 info!("Looking for providers for {cid}");
887 let key = cid.hash().to_bytes();
888 kad.get_providers(key.into());
889 }
890 }
891 crate::p2p::bitswap::Event::CancelBlock { cid } => {
892 info!(%cid, "block request cancelled");
893 if let Some(list) = self.bitswap_cancellable.remove(&cid) {
894 for signal in list {
895 signal.notify_waiters();
896 }
897 }
898 }
899 crate::p2p::bitswap::Event::BlockRetrieved { cid } => {
900 info!(%cid, "block retrieved")
901 }
902 },
903 _ => debug!("Swarm event: {:?}", swarm_event),
904 }
905 }
906
907 fn handle_event(&mut self, event: IpfsEvent) {
908 match event {
909 IpfsEvent::Connect(target, ret) => {
910 let connection_id = target.connection_id();
911
912 if let Err(e) = self.swarm.dial(target) {
913 let _ = ret.send(Err(anyhow::Error::from(e)));
914 return;
915 }
916 self.pending_connection.insert(connection_id, ret);
917 }
918 IpfsEvent::Protocol(ret) => {
919 let info = self.swarm.behaviour().supported_protocols();
920 let _ = ret.send(info);
921 }
922 #[cfg(feature = "experimental_stream")]
923 IpfsEvent::StreamControlHandle(ret) => {
924 let Some(stream) = self.swarm.behaviour_mut().stream.as_ref() else {
925 let _ = ret.send(Err(anyhow!("stream protocol is disabled")));
926 return;
927 };
928
929 let _ = ret.send(Ok(stream.new_control()));
930 }
931 #[cfg(feature = "experimental_stream")]
932 IpfsEvent::NewStream(protocol, ret) => {
933 let Some(stream) = self.swarm.behaviour_mut().stream.as_ref() else {
934 let _ = ret.send(Err(anyhow!("stream protocol is disabled")));
935 return;
936 };
937
938 let _ = ret.send(
939 stream
940 .new_control()
941 .accept(protocol)
942 .map_err(anyhow::Error::from),
943 );
944 }
945 IpfsEvent::Addresses(ret) => {
946 let addrs = self.swarm.behaviour_mut().addrs();
947 ret.send(Ok(addrs)).ok();
948 }
949 IpfsEvent::Listeners(ret) => {
950 let listeners = self.swarm.listeners().cloned().collect::<Vec<Multiaddr>>();
951 ret.send(Ok(listeners)).ok();
952 }
953 IpfsEvent::ExternalAddresses(ret) => {
954 let external = self
955 .swarm
956 .external_addresses()
957 .cloned()
958 .collect::<Vec<Multiaddr>>();
959
960 ret.send(Ok(external)).ok();
961 }
962 IpfsEvent::IsConnected(peer_id, ret) => {
963 let connected = self.swarm.is_connected(&peer_id);
964 ret.send(Ok(connected)).ok();
965 }
966 IpfsEvent::Connected(ret) => {
967 let connections = self.swarm.connected_peers().copied();
968 ret.send(Ok(connections.collect())).ok();
969 }
970 IpfsEvent::Disconnect(peer, ret) => {
971 if self.swarm.disconnect_peer_id(peer).is_err() {
972 let _ = ret.send(Err(anyhow::anyhow!("Peer is not connected")));
973 return;
974 }
975
976 self.pending_disconnection
977 .entry(peer)
978 .or_default()
979 .push(ret);
980 }
981 IpfsEvent::Ban(peer, ret) => {
982 self.swarm.behaviour_mut().block_list.block_peer(peer);
983 let _ = ret.send(Ok(()));
984 }
985 IpfsEvent::Unban(peer, ret) => {
986 self.swarm.behaviour_mut().block_list.unblock_peer(peer);
987 let _ = ret.send(Ok(()));
988 }
989 IpfsEvent::PubsubSubscribe(topic, ret) => {
990 let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
991 let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
992 return;
993 };
994
995 let _ = ret.send(Ok(pubsub.subscribe(topic).ok()));
996 }
997 IpfsEvent::PubsubUnsubscribe(topic, ret) => {
998 let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
999 let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1000 return;
1001 };
1002
1003 let _ = ret.send(Ok(pubsub.unsubscribe(topic)));
1004 }
1005 IpfsEvent::PubsubPublish(topic, data, ret) => {
1006 let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
1007 let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1008 return;
1009 };
1010
1011 let _ = ret.send(Ok(pubsub.publish(topic, data)));
1012 }
1013 IpfsEvent::PubsubPeers(Some(topic), ret) => {
1014 let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
1015 let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1016 return;
1017 };
1018
1019 let _ = ret.send(Ok(pubsub.subscribed_peers(topic)));
1020 }
1021 IpfsEvent::PubsubPeers(None, ret) => {
1022 let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
1023 let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1024 return;
1025 };
1026
1027 let _ = ret.send(Ok(pubsub.known_peers()));
1028 }
1029 IpfsEvent::PubsubSubscribed(ret) => {
1030 let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
1031 let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1032 return;
1033 };
1034 let list = pubsub.topics().map(|t| t.to_string()).collect();
1035 let _ = ret.send(Ok(list));
1036 }
1037 IpfsEvent::PubsubEventStream(ret) => {
1056 let (tx, rx) = unbounded();
1057 self.pubsub_event_stream.push(tx);
1058 let _ = ret.send(rx);
1059 }
1060 IpfsEvent::AddListeningAddress(addr, ret) => match self.swarm.listen_on(addr) {
1061 Ok(id) => {
1062 self.pending_add_listener.insert(id, ret);
1063 }
1064 Err(e) => {
1065 let _ = ret.send(Err(anyhow::anyhow!(e)));
1066 }
1067 },
1068 IpfsEvent::RemoveListeningAddress(addr, ret) => {
1069 let Some(listener_id) = self.listening_addresses.iter().find_map(|(id, list)| {
1070 if list.contains(&addr) {
1071 return Some(id);
1072 }
1073
1074 None
1075 }) else {
1076 let _ = ret.send(Err(format_err!(
1077 "Address was not listened to before: {}",
1078 addr
1079 )));
1080 return;
1081 };
1082
1083 match self.swarm.remove_listener(*listener_id) {
1084 true => {
1085 self.pending_remove_listener.insert(*listener_id, ret);
1086 }
1087 false => {
1088 let _ = ret.send(Err(anyhow::anyhow!(
1089 "Failed to remove previously added listening address: {}",
1090 addr
1091 )));
1092 }
1093 }
1094 }
1095 IpfsEvent::AddExternalAddress(addr, ret) => {
1096 self.swarm.add_external_address(addr);
1097 let _ = ret.send(Ok(()));
1098 }
1099 IpfsEvent::RemoveExternalAddress(addr, ret) => {
1100 self.swarm.remove_external_address(&addr);
1101 let _ = ret.send(Ok(()));
1102 }
1103 IpfsEvent::ConnectionEvents(ret) => {
1104 let (tx, rx) = futures::channel::mpsc::channel(self.event_capacity);
1105 self.connection_events.push(tx);
1106 let _ = ret.send(Ok(rx));
1107 }
1108 IpfsEvent::PeerConnectionEvents(peer_id, ret) => {
1109 let (tx, rx) = futures::channel::mpsc::channel(self.event_capacity);
1110 self.peer_connection_events
1111 .entry(peer_id)
1112 .or_default()
1113 .push(tx);
1114 let _ = ret.send(Ok(rx));
1115 }
1116 IpfsEvent::Bootstrap(ret) => {
1117 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1118 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1119 return;
1120 };
1121
1122 let future = match kad.bootstrap() {
1123 Ok(id) => {
1124 let (tx, rx) = oneshot::channel();
1125 self.kad_subscriptions.insert(id, tx);
1126 Ok(rx)
1127 }
1128 Err(e) => {
1129 error!("kad: can't bootstrap the node: {:?}", e);
1130 Err(anyhow!("kad: can't bootstrap the node: {:?}", e))
1131 }
1132 };
1133 let _ = ret.send(future);
1134 }
1135 IpfsEvent::AddPeer(opt, ret) => {
1136 let result = match self.swarm.behaviour_mut().add_peer(opt) {
1137 true => Ok(()),
1138 false => Err(anyhow::anyhow!("unable to add peer")),
1139 };
1140
1141 let _ = ret.send(result);
1142 }
1143 IpfsEvent::RemovePeer(peer_id, addr, ret) => {
1144 let result = match addr {
1145 Some(addr) => Ok(self
1146 .swarm
1147 .behaviour_mut()
1148 .addressbook
1149 .remove_address(&peer_id, &addr)),
1150 None => Ok(self.swarm.behaviour_mut().addressbook.remove_peer(&peer_id)),
1151 };
1152
1153 let _ = ret.send(result);
1154 }
1155 IpfsEvent::GetClosestPeers(peer_id, ret) => {
1156 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1157 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1158 return;
1159 };
1160
1161 let id = kad.get_closest_peers(peer_id);
1162
1163 let (tx, rx) = oneshot::channel();
1164
1165 self.kad_subscriptions.insert(id, tx);
1166 let _ = ret.send(Ok(rx));
1167 }
1168 IpfsEvent::WantList(peer, ret) => {
1169 let Some(bitswap) = self.swarm.behaviour().bitswap.as_ref() else {
1170 let _ = ret.send(Ok(futures::future::ready(vec![]).boxed()));
1171 return;
1172 };
1173 let list = match peer {
1174 Some(peer_id) => bitswap.peer_wantlist(peer_id),
1175 None => bitswap.local_wantlist(),
1176 };
1177 let _ = ret.send(Ok(futures::future::ready(list).boxed()));
1178 }
1179 IpfsEvent::GetBitswapPeers(ret) => {
1180 let _ = ret.send(Ok(futures::future::ready(vec![]).boxed()));
1181 }
1182 IpfsEvent::FindPeerIdentity(peer_id, ret) => {
1183 let locally_known = self.swarm.behaviour().peerbook.get_peer_info(peer_id);
1184
1185 let (tx, rx) = oneshot::channel();
1186
1187 match locally_known {
1188 Some(info) => {
1189 let _ = tx.send(Ok(info.clone()));
1190 }
1191 None => {
1192 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1193 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1194 return;
1195 };
1196
1197 kad.get_closest_peers(peer_id);
1198
1199 self.dht_peer_lookup.entry(peer_id).or_default().push(tx);
1200 }
1201 }
1202
1203 let _ = ret.send(Ok(rx));
1204 }
1205 IpfsEvent::FindPeer(peer_id, local_only, ret) => {
1206 let listener_addrs = self
1207 .swarm
1208 .behaviour_mut()
1209 .peerbook
1210 .peer_connections(peer_id)
1211 .unwrap_or_default()
1212 .into_iter()
1213 .map(|mut addr| {
1214 addr.extract_peer_id();
1215 addr
1216 })
1217 .collect::<Vec<_>>();
1218
1219 let locally_known_addrs = if !listener_addrs.is_empty() {
1220 listener_addrs
1221 } else {
1222 self.swarm
1223 .behaviour()
1224 .addressbook
1225 .get_peer_addresses(&peer_id)
1226 .unwrap_or_default()
1227 };
1228
1229 let addrs = if !locally_known_addrs.is_empty() || local_only {
1230 Either::Left(locally_known_addrs)
1231 } else {
1232 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1233 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1234 return;
1235 };
1236
1237 Either::Right({
1238 let id = kad.get_closest_peers(peer_id);
1239
1240 let (tx, rx) = oneshot::channel();
1241 self.kad_subscriptions.insert(id, tx);
1242
1243 rx
1244 })
1245 };
1246 let _ = ret.send(Ok(addrs));
1247 }
1248 IpfsEvent::GetProviders(key, ret) => {
1249 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1250 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1251 return;
1252 };
1253
1254 let id = kad.get_providers(key);
1255
1256 let (tx, mut rx) = futures::channel::mpsc::unbounded();
1257 let stream = async_stream::stream! {
1258 let mut current_providers: HashSet<PeerId> = Default::default();
1259 while let Some(provider) = rx.next().await {
1260 if current_providers.insert(provider) {
1261 yield provider;
1262 }
1263 }
1264 };
1265 self.provider_stream.insert(id, tx);
1266
1267 let _ = ret.send(Ok(Some(stream.boxed())));
1268 }
1269 IpfsEvent::Provide(key, ret) => {
1270 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1271 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1272 return;
1273 };
1274
1275 let future = match kad.start_providing(key) {
1276 Ok(id) => {
1277 let (tx, rx) = oneshot::channel();
1278 self.kad_subscriptions.insert(id, tx);
1279 Ok(rx)
1280 }
1281 Err(e) => {
1282 error!("kad: can't provide a key: {:?}", e);
1283 Err(anyhow!("kad: can't provide the key: {:?}", e))
1284 }
1285 };
1286 let _ = ret.send(future);
1287 }
1288 IpfsEvent::DhtMode(mode, ret) => {
1289 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1290 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1291 return;
1292 };
1293
1294 kad.set_mode(mode.into());
1295
1296 let _ = ret.send(Ok(()));
1297 }
1298 IpfsEvent::DhtGet(key, ret) => {
1299 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1300 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1301 return;
1302 };
1303
1304 let id = kad.get_record(key);
1305
1306 let (tx, mut rx) = futures::channel::mpsc::unbounded();
1307 let stream = async_stream::stream! {
1308 while let Some(record) = rx.next().await {
1309 yield record;
1310 }
1311 };
1312 self.record_stream.insert(id, tx);
1313
1314 let _ = ret.send(Ok(stream.boxed()));
1315 }
1316 IpfsEvent::DhtPut(key, value, quorum, ret) => {
1317 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1318 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1319 return;
1320 };
1321
1322 let record = Record {
1323 key,
1324 value,
1325 publisher: None,
1326 expires: None,
1327 };
1328
1329 let future = match kad.put_record(record, quorum) {
1330 Ok(id) => {
1331 let (tx, rx) = oneshot::channel();
1332 self.kad_subscriptions.insert(id, tx);
1333 Ok(rx)
1334 }
1335 Err(e) => {
1336 error!("kad: can't put a record: {:?}", e);
1337 Err(anyhow!("kad: can't provide the record: {:?}", e))
1338 }
1339 };
1340
1341 let _ = ret.send(future);
1342 }
1343 IpfsEvent::GetBootstrappers(ret) => {
1344 let list = Vec::from_iter(self.bootstraps.iter().cloned());
1345 let _ = ret.send(list);
1346 }
1347 IpfsEvent::AddBootstrapper(mut addr, ret) => {
1348 if !self.swarm.behaviour().kademlia.is_enabled() {
1349 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1350 return;
1351 };
1352
1353 let ret_addr = addr.clone();
1354
1355 if self.bootstraps.insert(addr.clone()) {
1356 if let Some(peer_id) = addr.extract_peer_id() {
1357 self.swarm.behaviour_mut().add_peer((peer_id, addr));
1358 trace!(peer_id=%peer_id, "tried to add a bootstrapper");
1360 }
1361 }
1362 let _ = ret.send(Ok(ret_addr));
1363 }
1364 IpfsEvent::RemoveBootstrapper(mut addr, ret) => {
1365 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1366 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1367 return;
1368 };
1369
1370 let result = addr.clone();
1371
1372 if self.bootstraps.remove(&addr) {
1373 if let Some(peer_id) = addr.extract_peer_id() {
1374 let prefix: Multiaddr = addr;
1375
1376 if let Some(e) = kad.remove_address(&peer_id, &prefix) {
1377 info!(peer_id=%peer_id, status=?e.status, "removed bootstrapper");
1378 } else {
1379 warn!(peer_id=%peer_id, "attempted to remove an unknown bootstrapper");
1380 }
1381 }
1382
1383 let _ = ret.send(Ok(result));
1384 }
1385 }
1386 IpfsEvent::ClearBootstrappers(ret) => {
1387 let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1388 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1389 return;
1390 };
1391
1392 let removed = self.bootstraps.drain().collect::<Vec<_>>();
1393 let mut list = Vec::with_capacity(removed.len());
1394
1395 for mut addr_with_peer_id in removed {
1396 let priginal = addr_with_peer_id.clone();
1397 let Some(peer_id) = addr_with_peer_id.extract_peer_id() else {
1398 continue;
1399 };
1400 let prefix: Multiaddr = addr_with_peer_id;
1401
1402 if let Some(e) = kad.remove_address(&peer_id, &prefix) {
1403 info!(peer_id=%peer_id, status=?e.status, "cleared bootstrapper");
1404 list.push(priginal);
1405 } else {
1406 error!(peer_id=%peer_id, "attempted to clear an unknown bootstrapper");
1407 }
1408 }
1409
1410 let _ = ret.send(Ok(list));
1411 }
1412 IpfsEvent::DefaultBootstrap(ret) => {
1413 if !self.swarm.behaviour().kademlia.is_enabled() {
1414 let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1415 return;
1416 };
1417
1418 let mut rets = Vec::new();
1419 for addr in BOOTSTRAP_NODES {
1420 let mut addr = addr
1421 .parse::<Multiaddr>()
1422 .expect("see test bootstrap_nodes_are_multiaddr_with_peerid");
1423 let original: Multiaddr = addr.clone();
1424 if self.bootstraps.insert(addr.clone()) {
1425 let Some(peer_id) = addr.extract_peer_id() else {
1426 continue;
1427 };
1428
1429 if self.swarm.behaviour_mut().add_peer((peer_id, addr.clone())) {
1430 trace!(peer_id=%peer_id, "tried to restore a bootstrapper");
1431 rets.push(original);
1433 }
1434 }
1435 }
1436
1437 let _ = ret.send(Ok(rets));
1438 }
1439 IpfsEvent::AddRelay(peer_id, addr, tx) => {
1440 let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1441 let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1442 return;
1443 };
1444
1445 relay.add_address(peer_id, addr);
1446
1447 let _ = tx.send(Ok(()));
1448 }
1449 IpfsEvent::RemoveRelay(peer_id, addr, tx) => {
1450 let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1451 let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1452 return;
1453 };
1454
1455 relay.remove_address(peer_id, addr);
1456
1457 let _ = tx.send(Ok(()));
1458 }
1459 IpfsEvent::EnableRelay(Some(peer_id), tx) => {
1460 let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1461 let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1462 return;
1463 };
1464
1465 relay.select(peer_id);
1466
1467 self.relay_listener.entry(peer_id).or_default().push(tx);
1468 }
1469 IpfsEvent::EnableRelay(None, tx) => {
1470 let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1471 let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1472 return;
1473 };
1474
1475 let Some(peer_id) = relay.random_select() else {
1476 let _ = tx.send(Err(anyhow::anyhow!(
1477 "No relay was selected or was unavailable"
1478 )));
1479 return;
1480 };
1481
1482 self.relay_listener.entry(peer_id).or_default().push(tx);
1483 }
1484 IpfsEvent::DisableRelay(peer_id, tx) => {
1485 let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1486 let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1487 return;
1488 };
1489 relay.disable_relay(peer_id);
1490
1491 let _ = tx.send(Ok(()));
1492 }
1493 IpfsEvent::ListRelays(tx) => {
1494 let Some(relay) = self.swarm.behaviour().relay_manager.as_ref() else {
1495 let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1496 return;
1497 };
1498
1499 let list = relay
1500 .list_relays()
1501 .map(|(peer_id, addrs)| (*peer_id, addrs.clone()))
1502 .collect();
1503
1504 let _ = tx.send(Ok(list));
1505 }
1506 IpfsEvent::ListActiveRelays(tx) => {
1507 let Some(relay) = self.swarm.behaviour().relay_manager.as_ref() else {
1508 let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1509 return;
1510 };
1511
1512 let list = relay.list_active_relays();
1513
1514 let _ = tx.send(Ok(list));
1515 }
1516 IpfsEvent::RegisterRendezvousNamespace(ns, peer_id, ttl, res) => {
1517 let Some(rz) = self.swarm.behaviour_mut().rendezvous_client.as_mut() else {
1518 let _ = res.send(Err(anyhow::anyhow!("Rendezvous client is not enabled")));
1519 return;
1520 };
1521
1522 if let Err(e) = rz.register(ns.clone(), peer_id, ttl) {
1523 let _ = res.send(Err(anyhow::Error::from(e)));
1524 return;
1525 }
1526 self.rzv_register_pending
1527 .entry((peer_id, ns))
1528 .or_default()
1529 .push(res);
1530 }
1531 IpfsEvent::UnregisterRendezvousNamespace(ns, peer_id, res) => {
1532 let Some(rz) = self.swarm.behaviour_mut().rendezvous_client.as_mut() else {
1533 let _ = res.send(Err(anyhow::anyhow!("Rendezvous client is not enabled")));
1534 return;
1535 };
1536
1537 rz.unregister(ns.clone(), peer_id);
1538
1539 let _ = res.send(Ok(()));
1540 }
1541 IpfsEvent::RendezvousNamespaceDiscovery(ns, use_cookie, ttl, peer_id, res) => {
1542 let Some(rz) = self.swarm.behaviour_mut().rendezvous_client.as_mut() else {
1543 let _ = res.send(Err(anyhow::anyhow!("Rendezvous client is not enabled")));
1544 return;
1545 };
1546
1547 let cookie = use_cookie
1548 .then(|| self.rzv_cookie.get(&peer_id).cloned().flatten())
1549 .flatten();
1550
1551 rz.discover(ns.clone(), cookie, ttl, peer_id);
1552
1553 match ns {
1554 Some(ns) => self
1555 .rzv_discover_pending
1556 .entry((peer_id, ns))
1557 .or_default()
1558 .push(res),
1559 None => {
1560 let _ = res.send(Ok(HashMap::new()));
1561 }
1562 }
1563 }
1564 IpfsEvent::RequestStream(protocol, res) => {
1565 let Some(rr) = self.swarm.behaviour_mut().request_response(protocol) else {
1566 let _ = res.send(Err(anyhow::anyhow!(
1567 "request-response behaviour is not enabled"
1568 )));
1569 return;
1570 };
1571 let rx = rr.subscribe();
1572 let _ = res.send(Ok(Box::pin(rx)));
1573 }
1574 IpfsEvent::SendRequest(protocol, peer_id, request, res) => {
1575 let Some(rr) = self.swarm.behaviour_mut().request_response(protocol) else {
1576 let _ = res.send(Err(anyhow::anyhow!(
1577 "request-response behaviour is not enabled"
1578 )));
1579 return;
1580 };
1581
1582 let fut = rr.send_request(peer_id, request);
1583
1584 let _ = res.send(Ok(fut));
1585 }
1586 IpfsEvent::SendRequests(protocol, peers, request, res) => {
1587 let Some(rr) = self.swarm.behaviour_mut().request_response(protocol) else {
1588 let _ = res.send(Err(anyhow::anyhow!(
1589 "request-response behaviour is not enabled"
1590 )));
1591 return;
1592 };
1593
1594 let st = rr.send_requests(peers, request);
1595
1596 let _ = res.send(Ok(st));
1597 }
1598 IpfsEvent::SendResponse(protocol, peer_id, id, response, res) => {
1599 let Some(rr) = self.swarm.behaviour_mut().request_response(protocol) else {
1600 let _ = res.send(Err(anyhow::anyhow!(
1601 "request-response behaviour is not enabled"
1602 )));
1603 return;
1604 };
1605
1606 let result = rr
1607 .send_response(peer_id, id, response)
1608 .map_err(anyhow::Error::from);
1609
1610 let _ = res.send(result);
1611 }
1612 IpfsEvent::Exit => {
1613 }
1615 }
1616 }
1617
1618 fn handle_repo_event(&mut self, event: RepoEvent) {
1619 match event {
1620 RepoEvent::WantBlock(cids, peers, timeout, signals) => {
1621 let Some(bs) = self.swarm.behaviour_mut().bitswap.as_mut() else {
1622 return;
1623 };
1624 if let Some(signals) = signals {
1625 for (cid, signals) in signals {
1626 if signals.is_empty() {
1627 continue;
1628 }
1629
1630 let entries = self.bitswap_cancellable.entry(cid).or_default();
1631 entries.extend(signals);
1632 }
1633 }
1634 bs.gets(cids, &peers, timeout);
1635 }
1636 RepoEvent::UnwantBlock(cid) => {
1637 let Some(bs) = self.swarm.behaviour_mut().bitswap.as_mut() else {
1638 return;
1639 };
1640 bs.cancel(cid);
1641 if let Some(list) = self.bitswap_cancellable.remove(&cid) {
1642 for signal in list {
1643 signal.notify_waiters();
1644 }
1645 }
1646 }
1647 RepoEvent::NewBlock(block) => {
1648 let Some(bs) = self.swarm.behaviour_mut().bitswap.as_mut() else {
1649 return;
1650 };
1651 bs.notify_new_blocks([*block.cid()]);
1652 }
1653 RepoEvent::RemovedBlock(_) => {}
1654 }
1655 }
1656}