rust_ipfs/
task.rs

1use anyhow::{anyhow, format_err};
2use either::Either;
3use futures::{
4    channel::{
5        mpsc::{unbounded, Receiver, UnboundedSender},
6        oneshot,
7    },
8    stream::Fuse,
9    FutureExt, StreamExt,
10};
11
12use crate::{p2p::MultiaddrExt, Channel, InnerPubsubEvent};
13use crate::{ConnectionEvents, PeerConnectionEvents, TSwarmEvent};
14
15use std::{
16    collections::{hash_map::Entry, HashMap, HashSet},
17    time::Duration,
18};
19
20use crate::{config::BOOTSTRAP_NODES, IpfsEvent, TSwarmEventFn};
21use futures_timer::Delay;
22use ipld_core::cid::Cid;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::task::{Context, Poll};
26
27use crate::{
28    p2p::TSwarm,
29    repo::{Repo, RepoEvent},
30    AddPeerOpt,
31};
32
33pub use crate::{p2p::BehaviourEvent, p2p::KadResult};
34
35pub use libp2p::{self, core::transport::ListenerId, swarm::NetworkBehaviour, Multiaddr, PeerId};
36use multibase::Base;
37
38use libp2p::core::{ConnectedPoint, Endpoint};
39#[cfg(not(target_arch = "wasm32"))]
40use libp2p::mdns::Event as MdnsEvent;
41use libp2p::multiaddr::Protocol;
42use libp2p::{
43    autonat,
44    identify::{Event as IdentifyEvent, Info as IdentifyInfo},
45    kad::{
46        AddProviderError, AddProviderOk, BootstrapError, BootstrapOk, Event as KademliaEvent,
47        GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError,
48        GetRecordOk, PutRecordError, PutRecordOk, QueryId, QueryResult::*, Record,
49    },
50    rendezvous::{Cookie, Namespace},
51    swarm::{ConnectionId, SwarmEvent},
52};
53use tokio::sync::Notify;
54
55/// Background task of `Ipfs` created when calling `UninitializedIpfs::start`.
56// The receivers are Fuse'd so that we don't have to manage state on them being exhausted.
57#[allow(clippy::type_complexity)]
58#[allow(dead_code)]
59pub(crate) struct IpfsTask<C: NetworkBehaviour<ToSwarm = void::Void>> {
60    pub(crate) swarm: TSwarm<C>,
61    pub(crate) repo_events: Fuse<Receiver<RepoEvent>>,
62    pub(crate) from_facade: Fuse<Receiver<IpfsEvent>>,
63    pub(crate) bitswap_cancellable: HashMap<Cid, Vec<Arc<Notify>>>,
64    pub(crate) listening_addresses: HashMap<ListenerId, Vec<Multiaddr>>,
65    pub(crate) provider_stream: HashMap<QueryId, UnboundedSender<PeerId>>,
66    pub(crate) record_stream: HashMap<QueryId, UnboundedSender<Record>>,
67    pub(crate) repo: Repo,
68    pub(crate) kad_subscriptions: HashMap<QueryId, Channel<KadResult>>,
69    pub(crate) dht_peer_lookup: HashMap<PeerId, Vec<Channel<libp2p::identify::Info>>>,
70    pub(crate) bootstraps: HashSet<Multiaddr>,
71    pub(crate) swarm_event: Option<TSwarmEventFn<C>>,
72    pub(crate) pubsub_event_stream: Vec<UnboundedSender<InnerPubsubEvent>>,
73    pub(crate) timer: TaskTimer,
74    pub(crate) local_external_addr: bool,
75    pub(crate) relay_listener: HashMap<PeerId, Vec<Channel<()>>>,
76    pub(crate) rzv_register_pending: HashMap<(PeerId, Namespace), Vec<Channel<()>>>,
77    pub(crate) rzv_discover_pending:
78        HashMap<(PeerId, Namespace), Vec<Channel<HashMap<PeerId, Vec<Multiaddr>>>>>,
79    pub(crate) rzv_cookie: HashMap<PeerId, Option<Cookie>>,
80
81    pub(crate) peer_connection_events:
82        HashMap<PeerId, Vec<futures::channel::mpsc::Sender<PeerConnectionEvents>>>,
83    pub(crate) connection_events: Vec<futures::channel::mpsc::Sender<ConnectionEvents>>,
84
85    pub(crate) pending_connection: HashMap<ConnectionId, Channel<ConnectionId>>,
86    pub(crate) pending_disconnection: HashMap<PeerId, Vec<Channel<()>>>,
87    pub(crate) pending_add_listener: HashMap<ListenerId, Channel<Multiaddr>>,
88    pub(crate) pending_remove_listener: HashMap<ListenerId, Channel<()>>,
89
90    pub(crate) event_capacity: usize,
91}
92
93impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
94    pub fn new(
95        swarm: TSwarm<C>,
96        repo_events: Fuse<Receiver<RepoEvent>>,
97        from_facade: Fuse<Receiver<IpfsEvent>>,
98        repo: &Repo,
99        event_capacity: usize,
100    ) -> Self {
101        IpfsTask {
102            repo_events,
103            from_facade,
104            swarm,
105            event_capacity,
106            provider_stream: HashMap::new(),
107            record_stream: HashMap::new(),
108            dht_peer_lookup: Default::default(),
109            pubsub_event_stream: Default::default(),
110            kad_subscriptions: Default::default(),
111            bitswap_cancellable: Default::default(),
112            repo: repo.clone(),
113            bootstraps: Default::default(),
114            swarm_event: Default::default(),
115            timer: Default::default(),
116            relay_listener: Default::default(),
117            local_external_addr: false,
118            rzv_register_pending: Default::default(),
119            rzv_discover_pending: Default::default(),
120            rzv_cookie: Default::default(),
121            listening_addresses: HashMap::new(),
122            peer_connection_events: HashMap::new(),
123            connection_events: Vec::new(),
124            pending_disconnection: Default::default(),
125            pending_connection: Default::default(),
126            pending_add_listener: Default::default(),
127            pending_remove_listener: Default::default(),
128        }
129    }
130}
131
132pub(crate) struct TaskTimer {
133    pub(crate) event_cleanup: Delay,
134}
135
136impl Default for TaskTimer {
137    fn default() -> Self {
138        let event_cleanup = Delay::new(Duration::from_secs(60));
139
140        Self { event_cleanup }
141    }
142}
143
144impl<C: NetworkBehaviour<ToSwarm = void::Void>> futures::Future for IpfsTask<C> {
145    type Output = ();
146
147    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
148        loop {
149            match self.swarm.poll_next_unpin(cx) {
150                Poll::Ready(Some(event)) => self.handle_swarm_event(event),
151                Poll::Ready(None) => return Poll::Ready(()),
152                Poll::Pending => break,
153            }
154        }
155        loop {
156            match self.from_facade.poll_next_unpin(cx) {
157                Poll::Ready(Some(event)) => self.handle_event(event),
158                Poll::Ready(None) => return Poll::Ready(()),
159                Poll::Pending => break,
160            }
161        }
162        loop {
163            match self.repo_events.poll_next_unpin(cx) {
164                Poll::Ready(Some(event)) => self.handle_repo_event(event),
165                Poll::Ready(None) => return Poll::Ready(()),
166                Poll::Pending => break,
167            }
168        }
169
170        if self.timer.event_cleanup.poll_unpin(cx).is_ready() {
171            self.pubsub_event_stream.retain(|ch| !ch.is_closed());
172            self.connection_events.retain(|ch| !ch.is_closed());
173            self.peer_connection_events.retain(|_, ch_list| {
174                ch_list.retain(|ch| !ch.is_closed());
175                !ch_list.is_empty()
176            });
177            self.timer.event_cleanup.reset(Duration::from_secs(60));
178        }
179
180        Poll::Pending
181    }
182}
183
184impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
185    pub(crate) async fn run(&mut self) {
186        let mut event_cleanup = futures_timer::Delay::new(Duration::from_secs(60));
187
188        loop {
189            tokio::select! {
190                biased;
191                Some(swarm) = self.swarm.next() => {
192                    self.handle_swarm_event(swarm);
193                },
194                Some(repo) = self.repo_events.next() => {
195                    self.handle_repo_event(repo);
196                },
197                Some(event) = self.from_facade.next() => {
198                    if matches!(event, IpfsEvent::Exit) {
199                        break;
200                    }
201                    self.handle_event(event);
202                },
203                _ = &mut event_cleanup => {
204                    self.pubsub_event_stream.retain(|ch| !ch.is_closed());
205                    self.connection_events.retain(|ch| !ch.is_closed());
206                    self.peer_connection_events.retain(|_, ch_list| {
207                        ch_list.retain(|ch| !ch.is_closed());
208                        !ch_list.is_empty()
209                    });
210                    event_cleanup.reset(Duration::from_secs(60));
211                }
212            }
213        }
214    }
215
216    fn emit_pubsub_event(&self, event: InnerPubsubEvent) {
217        for ch in &self.pubsub_event_stream {
218            let event = event.clone();
219            let _ = ch.unbounded_send(event);
220        }
221    }
222
223    fn handle_swarm_event(&mut self, swarm_event: TSwarmEvent<C>) {
224        if let Some(handler) = self.swarm_event.as_ref() {
225            handler(&mut self.swarm, &swarm_event)
226        }
227        match swarm_event {
228            SwarmEvent::NewListenAddr {
229                listener_id,
230                address,
231            } => {
232                if self.local_external_addr
233                    && !address.is_relay()
234                    && (address.is_loopback() || address.is_private())
235                {
236                    self.swarm.add_external_address(address.clone());
237                }
238
239                if !address.is_loopback() && !address.is_private() {
240                    // We will assume that the address is global and reachable externally
241                    self.swarm.add_external_address(address.clone());
242                }
243
244                self.listening_addresses
245                    .entry(listener_id)
246                    .or_default()
247                    .push(address.clone());
248
249                if let Some(ret) = self.pending_add_listener.remove(&listener_id) {
250                    let _ = ret.send(Ok(address));
251                }
252            }
253            SwarmEvent::ConnectionEstablished {
254                peer_id,
255                connection_id,
256                endpoint,
257                ..
258            } => {
259                if let Some(ch) = self.pending_connection.remove(&connection_id) {
260                    let _ = ch.send(Ok(connection_id));
261                }
262
263                let (ep, mut addr) = match &endpoint {
264                    ConnectedPoint::Dialer { address, .. } => (Endpoint::Dialer, address.clone()),
265                    ConnectedPoint::Listener { local_addr, .. } if endpoint.is_relayed() => {
266                        (Endpoint::Listener, local_addr.clone())
267                    }
268                    ConnectedPoint::Listener { send_back_addr, .. } => {
269                        (Endpoint::Listener, send_back_addr.clone())
270                    }
271                };
272
273                if matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
274                    addr.pop();
275                }
276
277                if let Some(ch_list) = self.peer_connection_events.get_mut(&peer_id) {
278                    let ev = match ep {
279                        Endpoint::Dialer => PeerConnectionEvents::OutgoingConnection {
280                            connection_id,
281                            addr: addr.clone(),
282                        },
283                        Endpoint::Listener => PeerConnectionEvents::IncomingConnection {
284                            connection_id,
285                            addr: addr.clone(),
286                        },
287                    };
288
289                    for ch in ch_list {
290                        let _ = ch.try_send(ev.clone());
291                    }
292                }
293
294                for ch in &mut self.connection_events {
295                    let ev = match ep {
296                        Endpoint::Dialer => ConnectionEvents::OutgoingConnection {
297                            peer_id,
298                            connection_id,
299                            addr: addr.clone(),
300                        },
301                        Endpoint::Listener => ConnectionEvents::IncomingConnection {
302                            peer_id,
303                            connection_id,
304                            addr: addr.clone(),
305                        },
306                    };
307
308                    let _ = ch.try_send(ev);
309                }
310            }
311            SwarmEvent::OutgoingConnectionError {
312                connection_id,
313                error,
314                ..
315            } => {
316                if let Some(ch) = self.pending_connection.remove(&connection_id) {
317                    let _ = ch.send(Err(anyhow::Error::from(error)));
318                }
319            }
320            SwarmEvent::ConnectionClosed {
321                peer_id,
322                connection_id,
323                ..
324            } => {
325                if let Some(ch) = self.pending_disconnection.remove(&peer_id) {
326                    for ch in ch {
327                        let _ = ch.send(Ok(()));
328                    }
329                }
330
331                if let Some(ch_list) = self.peer_connection_events.get_mut(&peer_id) {
332                    for ch in ch_list {
333                        let _ =
334                            ch.try_send(PeerConnectionEvents::ClosedConnection { connection_id });
335                    }
336                }
337
338                for ch in &mut self.connection_events {
339                    let _ = ch.try_send(ConnectionEvents::ClosedConnection {
340                        peer_id,
341                        connection_id,
342                    });
343                }
344            }
345            SwarmEvent::ExpiredListenAddr {
346                listener_id,
347                address,
348            } => {
349                if let Some(list) = self.listening_addresses.get_mut(&listener_id) {
350                    list.retain(|addr| &address != addr);
351                }
352
353                self.swarm.remove_external_address(&address);
354            }
355            SwarmEvent::ListenerClosed {
356                listener_id,
357                reason,
358                addresses,
359            } => {
360                for address in addresses {
361                    self.listening_addresses.remove(&listener_id);
362                    self.swarm.remove_external_address(&address);
363                }
364
365                if let Some(ret) = self.pending_remove_listener.remove(&listener_id) {
366                    let _ = ret.send(reason.map_err(anyhow::Error::from));
367                }
368            }
369            SwarmEvent::ListenerError { listener_id, error } => {
370                if let Some(ret) = self.pending_add_listener.remove(&listener_id) {
371                    let _ = ret.send(Err(error.into()));
372                }
373            }
374            #[cfg(not(target_arch = "wasm32"))]
375            SwarmEvent::Behaviour(BehaviourEvent::Mdns(event)) => match event {
376                MdnsEvent::Discovered(list) => {
377                    for (peer, addr) in list {
378                        self.swarm.behaviour_mut().add_peer((peer, addr));
379                    }
380                }
381                MdnsEvent::Expired(list) => {
382                    for (peer, _) in list {
383                        if let Some(mdns) = self.swarm.behaviour().mdns.as_ref() {
384                            if !mdns.discovered_nodes().any(|p| p == &peer) {
385                                trace!("mdns: Expired peer {}", peer.to_base58());
386                            }
387                        }
388                    }
389                }
390            },
391            SwarmEvent::Behaviour(BehaviourEvent::Kademlia(event)) => {
392                match event {
393                    KademliaEvent::InboundRequest { request } => {
394                        trace!("kad: inbound {:?} request handled", request);
395                    }
396                    KademliaEvent::OutboundQueryProgressed {
397                        result, id, step, ..
398                    } => {
399                        // make sure the query is exhausted
400
401                        if self
402                            .swarm
403                            .behaviour()
404                            .kademlia
405                            .as_ref()
406                            .and_then(|kad| kad.query(&id))
407                            .is_none()
408                        {
409                            match result {
410                                // these subscriptions return actual values
411                                GetClosestPeers(_) | GetProviders(_) | GetRecord(_) => {}
412                                // we want to return specific errors for the following
413                                Bootstrap(Err(_)) | StartProviding(Err(_)) | PutRecord(Err(_)) => {}
414                                // and the rest can just return a general KadResult::Complete
415                                _ => {
416                                    if let Some(ret) = self.kad_subscriptions.remove(&id) {
417                                        let _ = ret.send(Ok(KadResult::Complete));
418                                    }
419                                }
420                            }
421                        }
422
423                        match result {
424                            Bootstrap(Ok(BootstrapOk {
425                                peer,
426                                num_remaining,
427                            })) => {
428                                debug!(
429                                    "kad: bootstrapped with {}, {} peers remain",
430                                    peer, num_remaining
431                                );
432                            }
433                            Bootstrap(Err(BootstrapError::Timeout { .. })) => {
434                                warn!("kad: timed out while trying to bootstrap");
435
436                                if let Some(ret) = self.kad_subscriptions.remove(&id) {
437                                    let _ = ret.send(Err(anyhow::anyhow!(
438                                        "kad: timed out while trying to bootstrap"
439                                    )));
440                                }
441                            }
442                            GetClosestPeers(Ok(GetClosestPeersOk { key, peers })) => {
443                                if let Some(ret) = self.kad_subscriptions.remove(&id) {
444                                    let _ = ret.send(Ok(KadResult::Peers(
445                                        peers.iter().map(|info| info.peer_id).collect(),
446                                    )));
447                                }
448                                if let Ok(peer_id) = PeerId::from_bytes(&key) {
449                                    if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
450                                        if !peers.iter().any(|info| info.peer_id == peer_id) {
451                                            for ret in rets {
452                                                let _ = ret.send(Err(anyhow::anyhow!(
453                                                    "Could not locate peer"
454                                                )));
455                                            }
456                                        }
457                                    }
458                                }
459                            }
460                            GetClosestPeers(Err(GetClosestPeersError::Timeout {
461                                key,
462                                peers: _,
463                            })) => {
464                                // don't mention the key here, as this is just the id of our node
465                                warn!("kad: timed out while trying to find all closest peers");
466
467                                if let Some(ret) = self.kad_subscriptions.remove(&id) {
468                                    let _ = ret.send(Err(anyhow::anyhow!(
469                                        "timed out while trying to find all closest peers"
470                                    )));
471                                }
472                                if let Ok(peer_id) = PeerId::from_bytes(&key) {
473                                    if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
474                                        for ret in rets {
475                                            let _ = ret.send(Err(anyhow::anyhow!(
476                                                "timed out while trying to find all closest peers"
477                                            )));
478                                        }
479                                    }
480                                }
481                            }
482                            GetProviders(Ok(GetProvidersOk::FoundProviders {
483                                key: _,
484                                providers,
485                            })) => {
486                                if let Entry::Occupied(entry) = self.provider_stream.entry(id) {
487                                    let tx = entry.get();
488                                    for provider in providers {
489                                        let _ = tx.unbounded_send(provider);
490                                    }
491                                }
492                            }
493                            GetProviders(Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
494                                ..
495                            })) => {
496                                if step.last {
497                                    if let Some(tx) = self.provider_stream.remove(&id) {
498                                        tx.close_channel();
499                                    }
500                                }
501                            }
502                            GetProviders(Err(GetProvidersError::Timeout { key, .. })) => {
503                                let key = multibase::encode(Base::Base32Lower, key);
504                                warn!("kad: timed out while trying to get providers for {}", key);
505
506                                if let Some(ret) = self.kad_subscriptions.remove(&id) {
507                                    let _ = ret.send(Err(anyhow::anyhow!(
508                                        "timed out while trying to get providers for the given key"
509                                    )));
510                                }
511                            }
512                            StartProviding(Ok(AddProviderOk { key })) => {
513                                let key = multibase::encode(Base::Base32Lower, key);
514                                debug!("kad: providing {}", key);
515                            }
516                            StartProviding(Err(AddProviderError::Timeout { key })) => {
517                                let key = multibase::encode(Base::Base32Lower, key);
518                                warn!("kad: timed out while trying to provide {}", key);
519
520                                if let Some(ret) = self.kad_subscriptions.remove(&id) {
521                                    let _ = ret.send(Err(anyhow::anyhow!(
522                                        "kad: timed out while trying to provide the record"
523                                    )));
524                                }
525                            }
526                            RepublishProvider(Ok(AddProviderOk { key })) => {
527                                let key = multibase::encode(Base::Base32Lower, key);
528                                debug!("kad: republished provider {}", key);
529                            }
530                            RepublishProvider(Err(AddProviderError::Timeout { key })) => {
531                                let key = multibase::encode(Base::Base32Lower, key);
532                                warn!("kad: timed out while trying to republish provider {}", key);
533                            }
534                            GetRecord(Ok(GetRecordOk::FoundRecord(record))) => {
535                                if let Entry::Occupied(entry) = self.record_stream.entry(id) {
536                                    let _ = entry.get().unbounded_send(record.record);
537                                }
538                            }
539                            GetRecord(Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
540                                ..
541                            })) => {
542                                if step.last {
543                                    if let Some(tx) = self.record_stream.remove(&id) {
544                                        tx.close_channel();
545                                    }
546                                }
547                            }
548                            GetRecord(Err(GetRecordError::NotFound {
549                                key,
550                                closest_peers: _,
551                            })) => {
552                                let key = multibase::encode(Base::Base32Lower, key);
553                                warn!("kad: couldn't find record {}", key);
554
555                                if let Some(tx) = self.record_stream.remove(&id) {
556                                    tx.close_channel();
557                                }
558                            }
559                            GetRecord(Err(GetRecordError::QuorumFailed {
560                                key,
561                                records: _,
562                                quorum,
563                            })) => {
564                                let key = multibase::encode(Base::Base32Lower, key);
565                                warn!(
566                                    "kad: quorum failed {} when trying to get key {}",
567                                    quorum, key
568                                );
569
570                                if let Some(tx) = self.record_stream.remove(&id) {
571                                    tx.close_channel();
572                                }
573                            }
574                            GetRecord(Err(GetRecordError::Timeout { key })) => {
575                                let key = multibase::encode(Base::Base32Lower, key);
576                                warn!("kad: timed out while trying to get key {}", key);
577
578                                if let Some(tx) = self.record_stream.remove(&id) {
579                                    tx.close_channel();
580                                }
581                            }
582                            PutRecord(Ok(PutRecordOk { key }))
583                            | RepublishRecord(Ok(PutRecordOk { key })) => {
584                                let key = multibase::encode(Base::Base32Lower, key);
585                                debug!("kad: successfully put record {}", key);
586                            }
587                            PutRecord(Err(PutRecordError::QuorumFailed {
588                                key,
589                                success: _,
590                                quorum,
591                            }))
592                            | RepublishRecord(Err(PutRecordError::QuorumFailed {
593                                key,
594                                success: _,
595                                quorum,
596                            })) => {
597                                let key = multibase::encode(Base::Base32Lower, key);
598                                warn!(
599                                    "kad: quorum failed ({}) when trying to put record {}",
600                                    quorum, key
601                                );
602
603                                if let Some(ret) = self.kad_subscriptions.remove(&id) {
604                                    let _ = ret.send(Err(anyhow::anyhow!(
605                                        "kad: quorum failed when trying to put the record"
606                                    )));
607                                }
608                            }
609                            PutRecord(Err(PutRecordError::Timeout {
610                                key,
611                                success: _,
612                                quorum: _,
613                            })) => {
614                                let key = multibase::encode(Base::Base32Lower, key);
615                                warn!("kad: timed out while trying to put record {}", key);
616
617                                if let Some(ret) = self.kad_subscriptions.remove(&id) {
618                                    let _ = ret.send(Err(anyhow::anyhow!(
619                                        "kad: timed out while trying to put record {}",
620                                        key
621                                    )));
622                                }
623                            }
624                            RepublishRecord(Err(PutRecordError::Timeout {
625                                key,
626                                success: _,
627                                quorum: _,
628                            })) => {
629                                let key = multibase::encode(Base::Base32Lower, key);
630                                warn!("kad: timed out while trying to republish record {}", key);
631                            }
632                        }
633                    }
634                    KademliaEvent::RoutingUpdated {
635                        peer,
636                        is_new_peer: _,
637                        addresses,
638                        bucket_range: _,
639                        old_peer: _,
640                    } => {
641                        trace!("kad: routing updated; {}: {:?}", peer, addresses);
642                    }
643                    KademliaEvent::UnroutablePeer { peer } => {
644                        trace!("kad: peer {} is unroutable", peer);
645                    }
646                    KademliaEvent::RoutablePeer { peer, address } => {
647                        trace!("kad: peer {} ({}) is routable", peer, address);
648                    }
649                    KademliaEvent::PendingRoutablePeer { peer, address } => {
650                        trace!("kad: pending routable peer {} ({})", peer, address);
651                    }
652                    KademliaEvent::ModeChanged { new_mode } => {
653                        let _ = new_mode;
654                    }
655                }
656            }
657            SwarmEvent::Behaviour(BehaviourEvent::Pubsub(
658                libp2p::gossipsub::Event::Subscribed { peer_id, topic },
659            )) => self.emit_pubsub_event(InnerPubsubEvent::Subscribe {
660                topic: topic.to_string(),
661                peer_id,
662            }),
663            SwarmEvent::Behaviour(BehaviourEvent::Pubsub(
664                libp2p::gossipsub::Event::Unsubscribed { peer_id, topic },
665            )) => self.emit_pubsub_event(InnerPubsubEvent::Unsubscribe {
666                topic: topic.to_string(),
667                peer_id,
668            }),
669            SwarmEvent::Behaviour(BehaviourEvent::Ping(event)) => match event {
670                libp2p::ping::Event {
671                    peer,
672                    connection,
673                    result: Result::Ok(rtt),
674                } => {
675                    trace!(
676                        "ping: rtt to {} is {} ms",
677                        peer.to_base58(),
678                        rtt.as_millis()
679                    );
680                    self.swarm.behaviour_mut().peerbook.set_peer_rtt(peer, rtt);
681
682                    if let Some(m) = self.swarm.behaviour_mut().relay_manager.as_mut() {
683                        m.set_peer_rtt(peer, connection, rtt)
684                    }
685                }
686                libp2p::ping::Event { .. } => {
687                    //TODO: Determine if we should continue handling ping errors and if we should disconnect/close connection.
688                }
689            },
690            SwarmEvent::Behaviour(BehaviourEvent::RelayClient(event)) => {
691                debug!("Relay Client Event: {event:?}");
692                if let Some(m) = self.swarm.behaviour_mut().relay_manager.as_mut() {
693                    m.process_relay_event(event);
694                }
695            }
696            SwarmEvent::Behaviour(BehaviourEvent::RelayManager(event)) => {
697                debug!("Relay Manager Event: {event:?}");
698                match event {
699                    libp2p_relay_manager::Event::ReservationSuccessful { peer_id, .. } => {
700                        if let Some(chs) = self.relay_listener.remove(&peer_id) {
701                            for ch in chs {
702                                let _ = ch.send(Ok(()));
703                            }
704                        }
705                    }
706                    libp2p_relay_manager::Event::ReservationClosed { peer_id, result } => {
707                        if let Some(chs) = self.relay_listener.remove(&peer_id) {
708                            match result {
709                                Ok(()) => {
710                                    for ch in chs {
711                                        let _ = ch.send(Ok(()));
712                                    }
713                                }
714                                Err(e) => {
715                                    let e = e.to_string();
716                                    for ch in chs {
717                                        let _ = ch.send(Err(anyhow::anyhow!("{}", e.clone())));
718                                    }
719                                }
720                            }
721                        }
722                    }
723                    libp2p_relay_manager::Event::ReservationFailure {
724                        peer_id,
725                        result: err,
726                    } => {
727                        if let Some(chs) = self.relay_listener.remove(&peer_id) {
728                            let e = err.to_string();
729                            for ch in chs {
730                                let _ = ch.send(Err(anyhow::anyhow!("{}", e.clone())));
731                            }
732                        }
733                    }
734                    _ => {}
735                }
736            }
737
738            SwarmEvent::Behaviour(BehaviourEvent::Identify(event)) => match event {
739                IdentifyEvent::Received { peer_id, info, .. } => {
740                    let IdentifyInfo {
741                        listen_addrs,
742                        protocols,
743                        ..
744                    } = &info;
745
746                    if let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() {
747                        if protocols.iter().any(|p| libp2p::kad::PROTOCOL_NAME.eq(p)) {
748                            for addr in listen_addrs {
749                                kad.add_address(&peer_id, addr.clone());
750                            }
751                        }
752                    }
753
754                    if protocols
755                        .iter()
756                        .any(|p| libp2p::autonat::DEFAULT_PROTOCOL_NAME.eq(p))
757                    {
758                        if let Some(autonat) = self.swarm.behaviour_mut().autonat.as_mut() {
759                            for addr in listen_addrs {
760                                autonat.add_server(peer_id, Some(addr.clone()));
761                            }
762                        }
763                    }
764
765                    if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
766                        for ret in rets {
767                            let _ = ret.send(Ok(info.clone()));
768                        }
769                    }
770
771                    self.swarm.behaviour_mut().peerbook.inject_peer_info(info);
772                }
773                event => debug!("identify: {:?}", event),
774            },
775            SwarmEvent::Behaviour(BehaviourEvent::Autonat(autonat::Event::StatusChanged {
776                old,
777                new,
778            })) => {
779                //TODO: Use status to indicate if we should use a relay or not
780                debug!("Old Nat Status: {:?}", old);
781                debug!("New Nat Status: {:?}", new);
782            }
783            SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(
784                libp2p::rendezvous::client::Event::Discovered {
785                    rendezvous_node,
786                    registrations,
787                    cookie,
788                },
789            )) => {
790                self.rzv_cookie.insert(rendezvous_node, Some(cookie));
791                let mut ns_list = HashSet::new();
792                let addrbook = &mut self.swarm.behaviour_mut().addressbook;
793                let mut ns_book: HashMap<Namespace, HashMap<PeerId, Vec<Multiaddr>>> =
794                    HashMap::new();
795                for registration in registrations {
796                    let namespace = registration.namespace.clone();
797                    let peer_id = registration.record.peer_id();
798                    let addrs = registration.record.addresses();
799
800                    // info!("Discovered {peer_id} with address {addr} in {namespace}");
801
802                    let opts = AddPeerOpt::with_peer_id(peer_id).set_addresses(addrs.to_vec());
803
804                    addrbook.add_address(opts);
805
806                    ns_book
807                        .entry(namespace.clone())
808                        .or_default()
809                        .entry(peer_id)
810                        .or_default()
811                        .extend(addrs.to_vec());
812                    ns_list.insert(namespace);
813                }
814
815                for ns in ns_list {
816                    let map = ns_book.remove(&ns).unwrap_or_default();
817                    if let Some(channels) = self.rzv_discover_pending.remove(&(rendezvous_node, ns))
818                    {
819                        for ch in channels {
820                            let _ = ch.send(Ok(map.clone()));
821                        }
822                    }
823                }
824            }
825            SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(
826                libp2p::rendezvous::client::Event::DiscoverFailed {
827                    rendezvous_node,
828                    namespace,
829                    error,
830                },
831            )) => {
832                let Some(ns) = namespace else {
833                    error!("Error registering to {rendezvous_node}: {error:?}");
834                    return;
835                };
836
837                error!("Error registering namespace {ns} to {rendezvous_node}: {error:?}");
838
839                if let Some(channels) = self.rzv_discover_pending.remove(&(rendezvous_node, ns)) {
840                    for ch in channels {
841                        let _ = ch.send(Err(anyhow::anyhow!(
842                            "Error discovering peers on {rendezvous_node}: {error:?}"
843                        )));
844                    }
845                }
846            }
847            SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(
848                libp2p::rendezvous::client::Event::Registered {
849                    rendezvous_node,
850                    ttl,
851                    namespace,
852                },
853            )) => {
854                info!("Registered to {rendezvous_node} under {namespace} for {ttl} secs");
855
856                if let Some(channels) = self
857                    .rzv_register_pending
858                    .remove(&(rendezvous_node, namespace.clone()))
859                {
860                    for ch in channels {
861                        let _ = ch.send(Ok(()));
862                    }
863                }
864            }
865            SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(
866                libp2p::rendezvous::client::Event::RegisterFailed {
867                    rendezvous_node,
868                    namespace,
869                    error,
870                },
871            )) => {
872                error!("Error registering namespace {namespace} to {rendezvous_node}: {error:?}");
873
874                if let Some(channels) = self
875                    .rzv_register_pending
876                    .remove(&(rendezvous_node, namespace.clone()))
877                {
878                    for ch in channels {
879                        let _ = ch.send(Err(anyhow::anyhow!("Error registering namespace {namespace} to {rendezvous_node}: {error:?}")));
880                    }
881                }
882            }
883            SwarmEvent::Behaviour(BehaviourEvent::Bitswap(event)) => match event {
884                crate::p2p::bitswap::Event::NeedBlock { cid } => {
885                    if let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() {
886                        info!("Looking for providers for {cid}");
887                        let key = cid.hash().to_bytes();
888                        kad.get_providers(key.into());
889                    }
890                }
891                crate::p2p::bitswap::Event::CancelBlock { cid } => {
892                    info!(%cid, "block request cancelled");
893                    if let Some(list) = self.bitswap_cancellable.remove(&cid) {
894                        for signal in list {
895                            signal.notify_waiters();
896                        }
897                    }
898                }
899                crate::p2p::bitswap::Event::BlockRetrieved { cid } => {
900                    info!(%cid, "block retrieved")
901                }
902            },
903            _ => debug!("Swarm event: {:?}", swarm_event),
904        }
905    }
906
907    fn handle_event(&mut self, event: IpfsEvent) {
908        match event {
909            IpfsEvent::Connect(target, ret) => {
910                let connection_id = target.connection_id();
911
912                if let Err(e) = self.swarm.dial(target) {
913                    let _ = ret.send(Err(anyhow::Error::from(e)));
914                    return;
915                }
916                self.pending_connection.insert(connection_id, ret);
917            }
918            IpfsEvent::Protocol(ret) => {
919                let info = self.swarm.behaviour().supported_protocols();
920                let _ = ret.send(info);
921            }
922            #[cfg(feature = "experimental_stream")]
923            IpfsEvent::StreamControlHandle(ret) => {
924                let Some(stream) = self.swarm.behaviour_mut().stream.as_ref() else {
925                    let _ = ret.send(Err(anyhow!("stream protocol is disabled")));
926                    return;
927                };
928
929                let _ = ret.send(Ok(stream.new_control()));
930            }
931            #[cfg(feature = "experimental_stream")]
932            IpfsEvent::NewStream(protocol, ret) => {
933                let Some(stream) = self.swarm.behaviour_mut().stream.as_ref() else {
934                    let _ = ret.send(Err(anyhow!("stream protocol is disabled")));
935                    return;
936                };
937
938                let _ = ret.send(
939                    stream
940                        .new_control()
941                        .accept(protocol)
942                        .map_err(anyhow::Error::from),
943                );
944            }
945            IpfsEvent::Addresses(ret) => {
946                let addrs = self.swarm.behaviour_mut().addrs();
947                ret.send(Ok(addrs)).ok();
948            }
949            IpfsEvent::Listeners(ret) => {
950                let listeners = self.swarm.listeners().cloned().collect::<Vec<Multiaddr>>();
951                ret.send(Ok(listeners)).ok();
952            }
953            IpfsEvent::ExternalAddresses(ret) => {
954                let external = self
955                    .swarm
956                    .external_addresses()
957                    .cloned()
958                    .collect::<Vec<Multiaddr>>();
959
960                ret.send(Ok(external)).ok();
961            }
962            IpfsEvent::IsConnected(peer_id, ret) => {
963                let connected = self.swarm.is_connected(&peer_id);
964                ret.send(Ok(connected)).ok();
965            }
966            IpfsEvent::Connected(ret) => {
967                let connections = self.swarm.connected_peers().copied();
968                ret.send(Ok(connections.collect())).ok();
969            }
970            IpfsEvent::Disconnect(peer, ret) => {
971                if self.swarm.disconnect_peer_id(peer).is_err() {
972                    let _ = ret.send(Err(anyhow::anyhow!("Peer is not connected")));
973                    return;
974                }
975
976                self.pending_disconnection
977                    .entry(peer)
978                    .or_default()
979                    .push(ret);
980            }
981            IpfsEvent::Ban(peer, ret) => {
982                self.swarm.behaviour_mut().block_list.block_peer(peer);
983                let _ = ret.send(Ok(()));
984            }
985            IpfsEvent::Unban(peer, ret) => {
986                self.swarm.behaviour_mut().block_list.unblock_peer(peer);
987                let _ = ret.send(Ok(()));
988            }
989            IpfsEvent::PubsubSubscribe(topic, ret) => {
990                let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
991                    let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
992                    return;
993                };
994
995                let _ = ret.send(Ok(pubsub.subscribe(topic).ok()));
996            }
997            IpfsEvent::PubsubUnsubscribe(topic, ret) => {
998                let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
999                    let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1000                    return;
1001                };
1002
1003                let _ = ret.send(Ok(pubsub.unsubscribe(topic)));
1004            }
1005            IpfsEvent::PubsubPublish(topic, data, ret) => {
1006                let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
1007                    let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1008                    return;
1009                };
1010
1011                let _ = ret.send(Ok(pubsub.publish(topic, data)));
1012            }
1013            IpfsEvent::PubsubPeers(Some(topic), ret) => {
1014                let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
1015                    let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1016                    return;
1017                };
1018
1019                let _ = ret.send(Ok(pubsub.subscribed_peers(topic)));
1020            }
1021            IpfsEvent::PubsubPeers(None, ret) => {
1022                let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
1023                    let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1024                    return;
1025                };
1026
1027                let _ = ret.send(Ok(pubsub.known_peers()));
1028            }
1029            IpfsEvent::PubsubSubscribed(ret) => {
1030                let Some(pubsub) = self.swarm.behaviour_mut().pubsub.as_mut() else {
1031                    let _ = ret.send(Err(anyhow!("pubsub protocol is disabled")));
1032                    return;
1033                };
1034                let list = pubsub.topics().map(|t| t.to_string()).collect();
1035                let _ = ret.send(Ok(list));
1036            }
1037            // IpfsEvent::WantList(peer, ret) => {
1038            //     let list = if let Some(peer) = peer {
1039            //         self.swarm
1040            //             .behaviour_mut()
1041            //             .bitswap()
1042            //             .peer_wantlist(&peer)
1043            //             .unwrap_or_default()
1044            //     } else {
1045            //         self.swarm.behaviour_mut().bitswap().local_wantlist()
1046            //     };
1047            //     let _ = ret.send(list);
1048            // }
1049            // IpfsEvent::BitswapStats(ret) => {
1050            //     let stats = self.swarm.behaviour_mut().bitswap().stats();
1051            //     let peers = self.swarm.behaviour_mut().bitswap().peers();
1052            //     let wantlist = self.swarm.behaviour_mut().bitswap().local_wantlist();
1053            //     let _ = ret.send((stats, peers, wantlist).into());
1054            // }
1055            IpfsEvent::PubsubEventStream(ret) => {
1056                let (tx, rx) = unbounded();
1057                self.pubsub_event_stream.push(tx);
1058                let _ = ret.send(rx);
1059            }
1060            IpfsEvent::AddListeningAddress(addr, ret) => match self.swarm.listen_on(addr) {
1061                Ok(id) => {
1062                    self.pending_add_listener.insert(id, ret);
1063                }
1064                Err(e) => {
1065                    let _ = ret.send(Err(anyhow::anyhow!(e)));
1066                }
1067            },
1068            IpfsEvent::RemoveListeningAddress(addr, ret) => {
1069                let Some(listener_id) = self.listening_addresses.iter().find_map(|(id, list)| {
1070                    if list.contains(&addr) {
1071                        return Some(id);
1072                    }
1073
1074                    None
1075                }) else {
1076                    let _ = ret.send(Err(format_err!(
1077                        "Address was not listened to before: {}",
1078                        addr
1079                    )));
1080                    return;
1081                };
1082
1083                match self.swarm.remove_listener(*listener_id) {
1084                    true => {
1085                        self.pending_remove_listener.insert(*listener_id, ret);
1086                    }
1087                    false => {
1088                        let _ = ret.send(Err(anyhow::anyhow!(
1089                            "Failed to remove previously added listening address: {}",
1090                            addr
1091                        )));
1092                    }
1093                }
1094            }
1095            IpfsEvent::AddExternalAddress(addr, ret) => {
1096                self.swarm.add_external_address(addr);
1097                let _ = ret.send(Ok(()));
1098            }
1099            IpfsEvent::RemoveExternalAddress(addr, ret) => {
1100                self.swarm.remove_external_address(&addr);
1101                let _ = ret.send(Ok(()));
1102            }
1103            IpfsEvent::ConnectionEvents(ret) => {
1104                let (tx, rx) = futures::channel::mpsc::channel(self.event_capacity);
1105                self.connection_events.push(tx);
1106                let _ = ret.send(Ok(rx));
1107            }
1108            IpfsEvent::PeerConnectionEvents(peer_id, ret) => {
1109                let (tx, rx) = futures::channel::mpsc::channel(self.event_capacity);
1110                self.peer_connection_events
1111                    .entry(peer_id)
1112                    .or_default()
1113                    .push(tx);
1114                let _ = ret.send(Ok(rx));
1115            }
1116            IpfsEvent::Bootstrap(ret) => {
1117                let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1118                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1119                    return;
1120                };
1121
1122                let future = match kad.bootstrap() {
1123                    Ok(id) => {
1124                        let (tx, rx) = oneshot::channel();
1125                        self.kad_subscriptions.insert(id, tx);
1126                        Ok(rx)
1127                    }
1128                    Err(e) => {
1129                        error!("kad: can't bootstrap the node: {:?}", e);
1130                        Err(anyhow!("kad: can't bootstrap the node: {:?}", e))
1131                    }
1132                };
1133                let _ = ret.send(future);
1134            }
1135            IpfsEvent::AddPeer(opt, ret) => {
1136                let result = match self.swarm.behaviour_mut().add_peer(opt) {
1137                    true => Ok(()),
1138                    false => Err(anyhow::anyhow!("unable to add peer")),
1139                };
1140
1141                let _ = ret.send(result);
1142            }
1143            IpfsEvent::RemovePeer(peer_id, addr, ret) => {
1144                let result = match addr {
1145                    Some(addr) => Ok(self
1146                        .swarm
1147                        .behaviour_mut()
1148                        .addressbook
1149                        .remove_address(&peer_id, &addr)),
1150                    None => Ok(self.swarm.behaviour_mut().addressbook.remove_peer(&peer_id)),
1151                };
1152
1153                let _ = ret.send(result);
1154            }
1155            IpfsEvent::GetClosestPeers(peer_id, ret) => {
1156                let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1157                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1158                    return;
1159                };
1160
1161                let id = kad.get_closest_peers(peer_id);
1162
1163                let (tx, rx) = oneshot::channel();
1164
1165                self.kad_subscriptions.insert(id, tx);
1166                let _ = ret.send(Ok(rx));
1167            }
1168            IpfsEvent::WantList(peer, ret) => {
1169                let Some(bitswap) = self.swarm.behaviour().bitswap.as_ref() else {
1170                    let _ = ret.send(Ok(futures::future::ready(vec![]).boxed()));
1171                    return;
1172                };
1173                let list = match peer {
1174                    Some(peer_id) => bitswap.peer_wantlist(peer_id),
1175                    None => bitswap.local_wantlist(),
1176                };
1177                let _ = ret.send(Ok(futures::future::ready(list).boxed()));
1178            }
1179            IpfsEvent::GetBitswapPeers(ret) => {
1180                let _ = ret.send(Ok(futures::future::ready(vec![]).boxed()));
1181            }
1182            IpfsEvent::FindPeerIdentity(peer_id, ret) => {
1183                let locally_known = self.swarm.behaviour().peerbook.get_peer_info(peer_id);
1184
1185                let (tx, rx) = oneshot::channel();
1186
1187                match locally_known {
1188                    Some(info) => {
1189                        let _ = tx.send(Ok(info.clone()));
1190                    }
1191                    None => {
1192                        let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1193                            let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1194                            return;
1195                        };
1196
1197                        kad.get_closest_peers(peer_id);
1198
1199                        self.dht_peer_lookup.entry(peer_id).or_default().push(tx);
1200                    }
1201                }
1202
1203                let _ = ret.send(Ok(rx));
1204            }
1205            IpfsEvent::FindPeer(peer_id, local_only, ret) => {
1206                let listener_addrs = self
1207                    .swarm
1208                    .behaviour_mut()
1209                    .peerbook
1210                    .peer_connections(peer_id)
1211                    .unwrap_or_default()
1212                    .into_iter()
1213                    .map(|mut addr| {
1214                        addr.extract_peer_id();
1215                        addr
1216                    })
1217                    .collect::<Vec<_>>();
1218
1219                let locally_known_addrs = if !listener_addrs.is_empty() {
1220                    listener_addrs
1221                } else {
1222                    self.swarm
1223                        .behaviour()
1224                        .addressbook
1225                        .get_peer_addresses(&peer_id)
1226                        .unwrap_or_default()
1227                };
1228
1229                let addrs = if !locally_known_addrs.is_empty() || local_only {
1230                    Either::Left(locally_known_addrs)
1231                } else {
1232                    let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1233                        let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1234                        return;
1235                    };
1236
1237                    Either::Right({
1238                        let id = kad.get_closest_peers(peer_id);
1239
1240                        let (tx, rx) = oneshot::channel();
1241                        self.kad_subscriptions.insert(id, tx);
1242
1243                        rx
1244                    })
1245                };
1246                let _ = ret.send(Ok(addrs));
1247            }
1248            IpfsEvent::GetProviders(key, ret) => {
1249                let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1250                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1251                    return;
1252                };
1253
1254                let id = kad.get_providers(key);
1255
1256                let (tx, mut rx) = futures::channel::mpsc::unbounded();
1257                let stream = async_stream::stream! {
1258                    let mut current_providers: HashSet<PeerId> = Default::default();
1259                    while let Some(provider) = rx.next().await {
1260                        if current_providers.insert(provider) {
1261                            yield provider;
1262                        }
1263                    }
1264                };
1265                self.provider_stream.insert(id, tx);
1266
1267                let _ = ret.send(Ok(Some(stream.boxed())));
1268            }
1269            IpfsEvent::Provide(key, ret) => {
1270                let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1271                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1272                    return;
1273                };
1274
1275                let future = match kad.start_providing(key) {
1276                    Ok(id) => {
1277                        let (tx, rx) = oneshot::channel();
1278                        self.kad_subscriptions.insert(id, tx);
1279                        Ok(rx)
1280                    }
1281                    Err(e) => {
1282                        error!("kad: can't provide a key: {:?}", e);
1283                        Err(anyhow!("kad: can't provide the key: {:?}", e))
1284                    }
1285                };
1286                let _ = ret.send(future);
1287            }
1288            IpfsEvent::DhtMode(mode, ret) => {
1289                let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1290                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1291                    return;
1292                };
1293
1294                kad.set_mode(mode.into());
1295
1296                let _ = ret.send(Ok(()));
1297            }
1298            IpfsEvent::DhtGet(key, ret) => {
1299                let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1300                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1301                    return;
1302                };
1303
1304                let id = kad.get_record(key);
1305
1306                let (tx, mut rx) = futures::channel::mpsc::unbounded();
1307                let stream = async_stream::stream! {
1308                    while let Some(record) = rx.next().await {
1309                            yield record;
1310                    }
1311                };
1312                self.record_stream.insert(id, tx);
1313
1314                let _ = ret.send(Ok(stream.boxed()));
1315            }
1316            IpfsEvent::DhtPut(key, value, quorum, ret) => {
1317                let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1318                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1319                    return;
1320                };
1321
1322                let record = Record {
1323                    key,
1324                    value,
1325                    publisher: None,
1326                    expires: None,
1327                };
1328
1329                let future = match kad.put_record(record, quorum) {
1330                    Ok(id) => {
1331                        let (tx, rx) = oneshot::channel();
1332                        self.kad_subscriptions.insert(id, tx);
1333                        Ok(rx)
1334                    }
1335                    Err(e) => {
1336                        error!("kad: can't put a record: {:?}", e);
1337                        Err(anyhow!("kad: can't provide the record: {:?}", e))
1338                    }
1339                };
1340
1341                let _ = ret.send(future);
1342            }
1343            IpfsEvent::GetBootstrappers(ret) => {
1344                let list = Vec::from_iter(self.bootstraps.iter().cloned());
1345                let _ = ret.send(list);
1346            }
1347            IpfsEvent::AddBootstrapper(mut addr, ret) => {
1348                if !self.swarm.behaviour().kademlia.is_enabled() {
1349                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1350                    return;
1351                };
1352
1353                let ret_addr = addr.clone();
1354
1355                if self.bootstraps.insert(addr.clone()) {
1356                    if let Some(peer_id) = addr.extract_peer_id() {
1357                        self.swarm.behaviour_mut().add_peer((peer_id, addr));
1358                        // the return value of add_address doesn't implement Debug
1359                        trace!(peer_id=%peer_id, "tried to add a bootstrapper");
1360                    }
1361                }
1362                let _ = ret.send(Ok(ret_addr));
1363            }
1364            IpfsEvent::RemoveBootstrapper(mut addr, ret) => {
1365                let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1366                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1367                    return;
1368                };
1369
1370                let result = addr.clone();
1371
1372                if self.bootstraps.remove(&addr) {
1373                    if let Some(peer_id) = addr.extract_peer_id() {
1374                        let prefix: Multiaddr = addr;
1375
1376                        if let Some(e) = kad.remove_address(&peer_id, &prefix) {
1377                            info!(peer_id=%peer_id, status=?e.status, "removed bootstrapper");
1378                        } else {
1379                            warn!(peer_id=%peer_id, "attempted to remove an unknown bootstrapper");
1380                        }
1381                    }
1382
1383                    let _ = ret.send(Ok(result));
1384                }
1385            }
1386            IpfsEvent::ClearBootstrappers(ret) => {
1387                let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {
1388                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1389                    return;
1390                };
1391
1392                let removed = self.bootstraps.drain().collect::<Vec<_>>();
1393                let mut list = Vec::with_capacity(removed.len());
1394
1395                for mut addr_with_peer_id in removed {
1396                    let priginal = addr_with_peer_id.clone();
1397                    let Some(peer_id) = addr_with_peer_id.extract_peer_id() else {
1398                        continue;
1399                    };
1400                    let prefix: Multiaddr = addr_with_peer_id;
1401
1402                    if let Some(e) = kad.remove_address(&peer_id, &prefix) {
1403                        info!(peer_id=%peer_id, status=?e.status, "cleared bootstrapper");
1404                        list.push(priginal);
1405                    } else {
1406                        error!(peer_id=%peer_id, "attempted to clear an unknown bootstrapper");
1407                    }
1408                }
1409
1410                let _ = ret.send(Ok(list));
1411            }
1412            IpfsEvent::DefaultBootstrap(ret) => {
1413                if !self.swarm.behaviour().kademlia.is_enabled() {
1414                    let _ = ret.send(Err(anyhow!("kad protocol is disabled")));
1415                    return;
1416                };
1417
1418                let mut rets = Vec::new();
1419                for addr in BOOTSTRAP_NODES {
1420                    let mut addr = addr
1421                        .parse::<Multiaddr>()
1422                        .expect("see test bootstrap_nodes_are_multiaddr_with_peerid");
1423                    let original: Multiaddr = addr.clone();
1424                    if self.bootstraps.insert(addr.clone()) {
1425                        let Some(peer_id) = addr.extract_peer_id() else {
1426                            continue;
1427                        };
1428
1429                        if self.swarm.behaviour_mut().add_peer((peer_id, addr.clone())) {
1430                            trace!(peer_id=%peer_id, "tried to restore a bootstrapper");
1431                            // report with the peerid
1432                            rets.push(original);
1433                        }
1434                    }
1435                }
1436
1437                let _ = ret.send(Ok(rets));
1438            }
1439            IpfsEvent::AddRelay(peer_id, addr, tx) => {
1440                let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1441                    let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1442                    return;
1443                };
1444
1445                relay.add_address(peer_id, addr);
1446
1447                let _ = tx.send(Ok(()));
1448            }
1449            IpfsEvent::RemoveRelay(peer_id, addr, tx) => {
1450                let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1451                    let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1452                    return;
1453                };
1454
1455                relay.remove_address(peer_id, addr);
1456
1457                let _ = tx.send(Ok(()));
1458            }
1459            IpfsEvent::EnableRelay(Some(peer_id), tx) => {
1460                let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1461                    let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1462                    return;
1463                };
1464
1465                relay.select(peer_id);
1466
1467                self.relay_listener.entry(peer_id).or_default().push(tx);
1468            }
1469            IpfsEvent::EnableRelay(None, tx) => {
1470                let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1471                    let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1472                    return;
1473                };
1474
1475                let Some(peer_id) = relay.random_select() else {
1476                    let _ = tx.send(Err(anyhow::anyhow!(
1477                        "No relay was selected or was unavailable"
1478                    )));
1479                    return;
1480                };
1481
1482                self.relay_listener.entry(peer_id).or_default().push(tx);
1483            }
1484            IpfsEvent::DisableRelay(peer_id, tx) => {
1485                let Some(relay) = self.swarm.behaviour_mut().relay_manager.as_mut() else {
1486                    let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1487                    return;
1488                };
1489                relay.disable_relay(peer_id);
1490
1491                let _ = tx.send(Ok(()));
1492            }
1493            IpfsEvent::ListRelays(tx) => {
1494                let Some(relay) = self.swarm.behaviour().relay_manager.as_ref() else {
1495                    let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1496                    return;
1497                };
1498
1499                let list = relay
1500                    .list_relays()
1501                    .map(|(peer_id, addrs)| (*peer_id, addrs.clone()))
1502                    .collect();
1503
1504                let _ = tx.send(Ok(list));
1505            }
1506            IpfsEvent::ListActiveRelays(tx) => {
1507                let Some(relay) = self.swarm.behaviour().relay_manager.as_ref() else {
1508                    let _ = tx.send(Err(anyhow::anyhow!("Relay is not enabled")));
1509                    return;
1510                };
1511
1512                let list = relay.list_active_relays();
1513
1514                let _ = tx.send(Ok(list));
1515            }
1516            IpfsEvent::RegisterRendezvousNamespace(ns, peer_id, ttl, res) => {
1517                let Some(rz) = self.swarm.behaviour_mut().rendezvous_client.as_mut() else {
1518                    let _ = res.send(Err(anyhow::anyhow!("Rendezvous client is not enabled")));
1519                    return;
1520                };
1521
1522                if let Err(e) = rz.register(ns.clone(), peer_id, ttl) {
1523                    let _ = res.send(Err(anyhow::Error::from(e)));
1524                    return;
1525                }
1526                self.rzv_register_pending
1527                    .entry((peer_id, ns))
1528                    .or_default()
1529                    .push(res);
1530            }
1531            IpfsEvent::UnregisterRendezvousNamespace(ns, peer_id, res) => {
1532                let Some(rz) = self.swarm.behaviour_mut().rendezvous_client.as_mut() else {
1533                    let _ = res.send(Err(anyhow::anyhow!("Rendezvous client is not enabled")));
1534                    return;
1535                };
1536
1537                rz.unregister(ns.clone(), peer_id);
1538
1539                let _ = res.send(Ok(()));
1540            }
1541            IpfsEvent::RendezvousNamespaceDiscovery(ns, use_cookie, ttl, peer_id, res) => {
1542                let Some(rz) = self.swarm.behaviour_mut().rendezvous_client.as_mut() else {
1543                    let _ = res.send(Err(anyhow::anyhow!("Rendezvous client is not enabled")));
1544                    return;
1545                };
1546
1547                let cookie = use_cookie
1548                    .then(|| self.rzv_cookie.get(&peer_id).cloned().flatten())
1549                    .flatten();
1550
1551                rz.discover(ns.clone(), cookie, ttl, peer_id);
1552
1553                match ns {
1554                    Some(ns) => self
1555                        .rzv_discover_pending
1556                        .entry((peer_id, ns))
1557                        .or_default()
1558                        .push(res),
1559                    None => {
1560                        let _ = res.send(Ok(HashMap::new()));
1561                    }
1562                }
1563            }
1564            IpfsEvent::RequestStream(protocol, res) => {
1565                let Some(rr) = self.swarm.behaviour_mut().request_response(protocol) else {
1566                    let _ = res.send(Err(anyhow::anyhow!(
1567                        "request-response behaviour is not enabled"
1568                    )));
1569                    return;
1570                };
1571                let rx = rr.subscribe();
1572                let _ = res.send(Ok(Box::pin(rx)));
1573            }
1574            IpfsEvent::SendRequest(protocol, peer_id, request, res) => {
1575                let Some(rr) = self.swarm.behaviour_mut().request_response(protocol) else {
1576                    let _ = res.send(Err(anyhow::anyhow!(
1577                        "request-response behaviour is not enabled"
1578                    )));
1579                    return;
1580                };
1581
1582                let fut = rr.send_request(peer_id, request);
1583
1584                let _ = res.send(Ok(fut));
1585            }
1586            IpfsEvent::SendRequests(protocol, peers, request, res) => {
1587                let Some(rr) = self.swarm.behaviour_mut().request_response(protocol) else {
1588                    let _ = res.send(Err(anyhow::anyhow!(
1589                        "request-response behaviour is not enabled"
1590                    )));
1591                    return;
1592                };
1593
1594                let st = rr.send_requests(peers, request);
1595
1596                let _ = res.send(Ok(st));
1597            }
1598            IpfsEvent::SendResponse(protocol, peer_id, id, response, res) => {
1599                let Some(rr) = self.swarm.behaviour_mut().request_response(protocol) else {
1600                    let _ = res.send(Err(anyhow::anyhow!(
1601                        "request-response behaviour is not enabled"
1602                    )));
1603                    return;
1604                };
1605
1606                let result = rr
1607                    .send_response(peer_id, id, response)
1608                    .map_err(anyhow::Error::from);
1609
1610                let _ = res.send(result);
1611            }
1612            IpfsEvent::Exit => {
1613                // FIXME: we could do a proper teardown
1614            }
1615        }
1616    }
1617
1618    fn handle_repo_event(&mut self, event: RepoEvent) {
1619        match event {
1620            RepoEvent::WantBlock(cids, peers, timeout, signals) => {
1621                let Some(bs) = self.swarm.behaviour_mut().bitswap.as_mut() else {
1622                    return;
1623                };
1624                if let Some(signals) = signals {
1625                    for (cid, signals) in signals {
1626                        if signals.is_empty() {
1627                            continue;
1628                        }
1629
1630                        let entries = self.bitswap_cancellable.entry(cid).or_default();
1631                        entries.extend(signals);
1632                    }
1633                }
1634                bs.gets(cids, &peers, timeout);
1635            }
1636            RepoEvent::UnwantBlock(cid) => {
1637                let Some(bs) = self.swarm.behaviour_mut().bitswap.as_mut() else {
1638                    return;
1639                };
1640                bs.cancel(cid);
1641                if let Some(list) = self.bitswap_cancellable.remove(&cid) {
1642                    for signal in list {
1643                        signal.notify_waiters();
1644                    }
1645                }
1646            }
1647            RepoEvent::NewBlock(block) => {
1648                let Some(bs) = self.swarm.behaviour_mut().bitswap.as_mut() else {
1649                    return;
1650                };
1651                bs.notify_new_blocks([*block.cid()]);
1652            }
1653            RepoEvent::RemovedBlock(_) => {}
1654        }
1655    }
1656}