Skip to main content

rust_ipfs/
builder.rs

1use crate::context::IpfsContext;
2use crate::keystore::Keystore;
3use crate::p2p::{
4    create_create_behaviour, AddressBookConfig, IdentifyConfiguration, PubsubConfig, RelayConfig,
5    TSwarm,
6};
7use crate::repo::{DefaultStorage, GCConfig, GCTrigger, Repo};
8use crate::{
9    context, ipns_to_dht_key, p2p, to_dht_key, ConnectionLimits, FDLimit, Ipfs, IpfsEvent,
10    IpfsOptions, Keypair, Multiaddr, NetworkBehaviour, RecordKey, RepoProvider, TSwarmEvent,
11    TSwarmEventFn,
12};
13use anyhow::Error;
14use async_rt::AbortableJoinHandle;
15use connexa::behaviour::peer_store::store::memory::MemoryStore;
16use connexa::behaviour::request_response::RequestResponseConfig;
17use connexa::builder::{ConnexaBuilder, FileDescLimit, IntoKeypair};
18use connexa::dummy;
19use connexa::prelude::identify::Event;
20use connexa::prelude::swarm::SwarmEvent;
21#[cfg(not(target_arch = "wasm32"))]
22#[cfg(feature = "pnet")]
23use connexa::prelude::transport::pnet::PreSharedKey;
24use connexa::prelude::{gossipsub, ping, swarm};
25use futures::{StreamExt, TryStreamExt};
26use std::collections::{BTreeSet, HashMap};
27use std::convert::Infallible;
28use std::sync::Arc;
29use std::task::Poll;
30use std::time::Duration;
31use tracing::Span;
32use tracing_futures::Instrument;
33
34/// Configured Ipfs which can only be started.
35#[allow(clippy::type_complexity)]
36pub struct IpfsBuilder<C: NetworkBehaviour<ToSwarm = Infallible> + Send + Sync + 'static> {
37    init: ConnexaBuilder<p2p::Behaviour<C>, IpfsContext, IpfsEvent, MemoryStore>,
38    options: IpfsOptions,
39    repo_handle: Repo<DefaultStorage>,
40    swarm_event: Option<TSwarmEventFn<C>>,
41    record_key_validator:
42        HashMap<String, Box<dyn Fn(&str) -> anyhow::Result<RecordKey> + Sync + Send>>,
43    gc_config: Option<GCConfig>,
44    custom_behaviour: Option<Box<dyn FnOnce(&Keypair) -> std::io::Result<C>>>,
45    gc_repo_duration: Option<Duration>,
46}
47
48pub type DefaultIpfsBuilder = IpfsBuilder<dummy::Behaviour>;
49
50#[deprecated(note = "Use IpfsBuilder instead")]
51pub type UninitializedIpfs<T> = IpfsBuilder<T>;
52
53#[deprecated(note = "Use DefaultIpfsBuilder instead")]
54pub type UninitializedIpfsDefault = DefaultIpfsBuilder;
55
56impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send + Sync + 'static> Default for IpfsBuilder<C> {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send + Sync + 'static> IpfsBuilder<C> {
63    /// New uninitualized instance
64    pub fn new() -> Self {
65        let keypair = Keypair::generate_ed25519();
66        Self::with_keypair(&keypair).expect("keypair is valid")
67    }
68
69    /// New instance with an existing keypair
70    pub fn with_keypair(keypair: impl IntoKeypair) -> std::io::Result<Self> {
71        Ok(Self {
72            init: ConnexaBuilder::with_existing_identity(keypair)?,
73            options: Default::default(),
74            repo_handle: Repo::new_memory(),
75            // record_validators: Default::default(),
76            record_key_validator: Default::default(),
77            swarm_event: None,
78            gc_config: None,
79            gc_repo_duration: None,
80            custom_behaviour: None,
81        })
82    }
83
84    /// Set default listening unspecified ipv4 and ipv6 addresses for tcp and quic
85    /// Note that this still requires for the transports to be enabled to be usable
86    pub fn set_default_listener(self) -> Self {
87        self.add_listening_addrs(vec![
88            "/ip4/0.0.0.0/tcp/0".parse().unwrap(),
89            "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
90        ])
91    }
92
93    /// Adds a listening address
94    pub fn add_listening_addr(mut self, addr: Multiaddr) -> Self {
95        if !self.options.listening_addrs.contains(&addr) {
96            self.options.listening_addrs.push(addr)
97        }
98        self
99    }
100
101    /// Set a connection limit
102    pub fn set_connection_limits<F>(mut self, f: F) -> Self
103    where
104        F: Fn(ConnectionLimits) -> ConnectionLimits + Send + Sync + 'static,
105    {
106        self.init = self.init.with_connection_limits_with_config(f);
107        self
108    }
109
110    /// Adds a listening addresses
111    pub fn add_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
112        self.options.listening_addrs.extend(addrs);
113        self
114    }
115
116    /// Set a list of listening addresses
117    pub fn set_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
118        self.options.listening_addrs = addrs;
119        self
120    }
121
122    /// Adds a bootstrap node
123    pub fn add_bootstrap(mut self, addr: Multiaddr) -> Self {
124        if !self.options.bootstrap.contains(&addr) {
125            self.options.bootstrap.push(addr)
126        }
127        self
128    }
129
130    /// Load default behaviour for basic functionality
131    pub fn with_default(self) -> Self {
132        self.with_identify(Default::default())
133            .with_autonat()
134            .with_bitswap()
135            .with_kademlia()
136            .with_ping(Default::default())
137            .with_pubsub(Default::default())
138    }
139
140    /// Enables kademlia
141    pub fn with_kademlia(mut self) -> Self {
142        self.init = self.init.with_kademlia();
143        self
144    }
145
146    /// Enables bitswap
147    pub fn with_bitswap(mut self) -> Self {
148        self.options.protocols.bitswap = true;
149        self
150    }
151
152    /// Enable mdns
153    #[cfg(not(target_arch = "wasm32"))]
154    pub fn with_mdns(mut self) -> Self {
155        self.init = self.init.with_mdns();
156        self
157    }
158
159    /// Enable relay client
160    pub fn with_relay(mut self, with_dcutr: bool) -> Self {
161        self.options.protocols.relay = true;
162        self.init = self.init.with_relay();
163        if with_dcutr {
164            #[cfg(not(target_arch = "wasm32"))]
165            {
166                self.init = self.init.with_dcutr();
167            }
168        }
169        self
170    }
171
172    /// Enable relay server
173    pub fn with_relay_server(mut self, config: RelayConfig) -> Self {
174        self.init = self
175            .init
176            .with_relay_server_with_config(move |_| config.into());
177        self
178    }
179
180    /// Enable port mapping (AKA UPnP)
181    #[cfg(not(target_arch = "wasm32"))]
182    pub fn with_upnp(mut self) -> Self {
183        self.init = self.init.with_upnp();
184        self
185    }
186
187    /// Enables rendezvous server
188    pub fn with_rendezvous_server(mut self) -> Self {
189        self.init = self.init.with_rendezvous_server();
190        self
191    }
192
193    /// Enables rendezvous client
194    pub fn with_rendezvous_client(mut self) -> Self {
195        self.init = self.init.with_rendezvous_client();
196        self
197    }
198
199    /// Enables identify
200    pub fn with_identify(mut self, config: IdentifyConfiguration) -> Self {
201        self.init = self
202            .init
203            .with_identify_with_config(config.protocol_version, move |cfg| {
204                cfg.with_agent_version(config.agent_version)
205                    .with_interval(config.interval)
206                    .with_push_listen_addr_updates(config.push_update)
207                    .with_cache_size(config.cache)
208            });
209        self
210    }
211
212    #[cfg(feature = "stream")]
213    pub fn with_streams(mut self) -> Self {
214        self.init = self.init.with_streams();
215        self
216    }
217
218    /// Enables pubsub
219    pub fn with_pubsub(mut self, config: PubsubConfig) -> Self {
220        self.init = self
221            .init
222            .with_gossipsub_with_config(move |keypair, mut builder| {
223                if let Some(protocol) = config.custom_protocol_id {
224                    builder.protocol_id(protocol, gossipsub::Version::V1_1);
225                }
226
227                builder.max_transmit_size(config.max_transmit_size);
228
229                if config.floodsub_compat {
230                    builder.support_floodsub();
231                }
232
233                builder.validation_mode(config.validate.into());
234                let auth =
235                    connexa::prelude::gossipsub::MessageAuthenticity::Signed(keypair.clone());
236                (builder, auth)
237            });
238        self
239    }
240
241    /// Enables request response.
242    /// Note: At this time, this option will only support up to 10 request-response behaviours.
243    ///       with any additional being ignored. Additionally, any duplicated protocols that are
244    ///       provided will be ignored.
245    pub fn with_request_response(mut self, config: Vec<RequestResponseConfig>) -> Self {
246        self.init = self.init.with_request_response(config);
247
248        self
249    }
250
251    /// Enables autonat
252    pub fn with_autonat(mut self) -> Self {
253        self.init = self.init.with_autonat_v1();
254        self
255    }
256
257    /// Enables ping
258    pub fn with_ping(mut self, config: ping::Config) -> Self {
259        self.init = self.init.with_ping_with_config(move |_| config);
260        self
261    }
262
263    /// Set a custom behaviour
264    pub fn with_custom_behaviour<F>(mut self, f: F) -> Self
265    where
266        F: FnOnce(&Keypair) -> std::io::Result<C> + 'static,
267    {
268        self.custom_behaviour.replace(Box::new(f));
269        self
270    }
271
272    /// Enables automatic garbage collection
273    pub fn with_gc(mut self, config: GCConfig) -> Self {
274        self.gc_config = Some(config);
275        self
276    }
277
278    /// Set a duration for which blocks are not removed due to the garbage collector
279    /// Defaults: 2 mins
280    pub fn set_temp_pin_duration(mut self, duration: Duration) -> Self {
281        self.gc_repo_duration = Some(duration);
282        self
283    }
284
285    /// Sets a path
286    #[cfg(not(target_arch = "wasm32"))]
287    pub fn set_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
288        let path = path.as_ref().to_path_buf();
289        self.options.ipfs_path = Some(path);
290        self
291    }
292
293    /// Sets a namespace
294    #[cfg(target_arch = "wasm32")]
295    pub fn set_namespace(mut self, ns: Option<String>) -> Self {
296        self.options.namespace = Some(ns);
297        self
298    }
299
300    /// Set timeout for idle connections
301    pub fn set_idle_connection_timeout(mut self, duration: u64) -> Self {
302        self.init = self.init.set_swarm_config(move |swarm| {
303            swarm.with_idle_connection_timeout(Duration::from_secs(duration))
304        });
305        self
306    }
307
308    /// Set swarm configuration
309    pub fn set_swarm_configuration<F>(mut self, f: F) -> Self
310    where
311        F: FnOnce(swarm::Config) -> swarm::Config + Send + Sync + 'static,
312    {
313        self.init = self.init.set_swarm_config(f);
314        self
315    }
316
317    /// Set default record validator for IPFS
318    /// Note: This will override any keys set for `ipns` prefix
319    pub fn default_record_key_validator(mut self) -> Self {
320        self.record_key_validator.insert(
321            "ipns".into(),
322            Box::new(|key| to_dht_key(("ipns", |key| ipns_to_dht_key(key)), key)),
323        );
324        self
325    }
326
327    pub fn set_record_prefix_validator<F>(mut self, key: &str, callback: F) -> Self
328    where
329        F: Fn(&str) -> anyhow::Result<RecordKey> + Sync + Send + 'static,
330    {
331        self.record_key_validator
332            .insert(key.to_string(), Box::new(callback));
333        self
334    }
335
336    /// Set address book configuration
337    pub fn set_addrbook_configuration(mut self, config: AddressBookConfig) -> Self {
338        self.options.addr_config = config;
339        self
340    }
341
342    /// Set RepoProvider option to provide blocks automatically
343    pub fn set_provider(mut self, opt: RepoProvider) -> Self {
344        self.options.provider = opt;
345        self
346    }
347
348    /// Set block and data repo
349    pub fn set_repo(mut self, repo: &Repo<DefaultStorage>) -> Self {
350        self.repo_handle = Repo::clone(repo);
351        self
352    }
353
354    /// Set a keystore
355    pub fn set_keystore(mut self, keystore: &Keystore) -> Self {
356        self.options.keystore = keystore.clone();
357        self
358    }
359
360    /// Enables quic transport
361    #[cfg(feature = "quic")]
362    #[cfg(not(target_arch = "wasm32"))]
363    pub fn enable_quic(mut self) -> Self {
364        self.init = self.init.enable_quic();
365        self
366    }
367
368    /// Enables quic transport with custom configuration
369    #[cfg(feature = "quic")]
370    #[cfg(not(target_arch = "wasm32"))]
371    pub fn enable_quic_with_config<F>(mut self, f: F) -> Self
372    where
373        F: FnOnce(
374                connexa::prelude::transport::quic::Config,
375            ) -> connexa::prelude::transport::quic::Config
376            + 'static,
377    {
378        self.init = self.init.enable_quic_with_config(f);
379        self
380    }
381
382    /// Enables tcp transport
383    #[cfg(feature = "tcp")]
384    #[cfg(not(target_arch = "wasm32"))]
385    pub fn enable_tcp(mut self) -> Self {
386        self.init = self.init.enable_tcp();
387        self
388    }
389
390    /// Enables tcp transport with custom configuration
391    #[cfg(feature = "tcp")]
392    #[cfg(not(target_arch = "wasm32"))]
393    pub fn enable_tcp_with_config<F>(mut self, f: F) -> Self
394    where
395        F: FnOnce(
396                connexa::prelude::transport::tcp::Config,
397            ) -> connexa::prelude::transport::tcp::Config
398            + 'static,
399    {
400        self.init = self.init.enable_tcp_with_config(f);
401        self
402    }
403
404    // /// Enables pnet transport
405    #[cfg(feature = "pnet")]
406    #[cfg(not(target_arch = "wasm32"))]
407    pub fn enable_pnet(mut self, psk: PreSharedKey) -> Self {
408        self.init = self.init.enable_pnet(psk);
409        self
410    }
411
412    /// Enables websocket transport
413    #[cfg(feature = "websocket")]
414    pub fn enable_websocket(mut self) -> Self {
415        self.init = self.init.enable_websocket();
416        self
417    }
418
419    /// Enables secure websocket transport
420    #[cfg(feature = "websocket")]
421    #[cfg(not(target_arch = "wasm32"))]
422    pub fn enable_secure_websocket(mut self) -> Self {
423        self.init = self.init.enable_secure_websocket();
424        self
425    }
426
427    /// Enables secure websocket transport
428    #[cfg(feature = "websocket")]
429    #[cfg(not(target_arch = "wasm32"))]
430    pub fn enable_secure_websocket_with_pem(mut self, keypair: String, certs: Vec<String>) -> Self {
431        self.init = self.init.enable_secure_websocket_with_pem(keypair, certs);
432        self
433    }
434
435    /// Enables secure websocket transport
436    #[cfg(feature = "websocket")]
437    #[cfg(not(target_arch = "wasm32"))]
438    pub fn enable_secure_websocket_with_config<F>(mut self, f: F) -> std::io::Result<Self>
439    where
440        F: FnOnce(&Keypair) -> std::io::Result<(Vec<String>, String)>,
441    {
442        self.init = self.init.enable_secure_websocket_with_config(f)?;
443        Ok(self)
444    }
445
446    /// Enables DNS
447    #[cfg(feature = "dns")]
448    pub fn enable_dns(self) -> Self {
449        self.enable_dns_with_resolver(connexa::prelude::transport::dns::DnsResolver::default())
450    }
451
452    /// Enables DNS with a specific resolver
453    #[cfg(feature = "dns")]
454    pub fn enable_dns_with_resolver(
455        mut self,
456        resolver: connexa::prelude::transport::dns::DnsResolver,
457    ) -> Self {
458        self.init = self.init.enable_dns_with_resolver(resolver);
459        self
460    }
461
462    /// Enables WebRTC transport
463    #[cfg(feature = "webrtc")]
464    pub fn enable_webrtc(mut self) -> Self {
465        self.init = self.init.enable_webrtc();
466        self
467    }
468
469    /// Enables WebRTC transport, allowing one to generate a certificate using the provided keypair in the closure.
470    #[cfg(feature = "webrtc")]
471    #[cfg(not(target_arch = "wasm32"))]
472    pub fn enable_webrtc_with_config<F>(mut self, f: F) -> std::io::Result<Self>
473    where
474        F: FnOnce(&Keypair) -> std::io::Result<String>,
475    {
476        self.init = self.init.enable_webrtc_with_config(f)?;
477        Ok(self)
478    }
479
480    /// Enable WebRTC transport with a provided pre-generated pem.
481    #[cfg(feature = "webrtc")]
482    #[cfg(not(target_arch = "wasm32"))]
483    pub fn enable_webrtc_with_pem(self, pem: impl Into<String>) -> Self {
484        let pem = pem.into();
485        self.enable_webrtc_with_config(move |_| Ok(pem))
486            .expect("pem is provided; should not fail")
487    }
488
489    /// Enables memory transport
490    pub fn enable_memory_transport(mut self) -> Self {
491        self.init = self.init.enable_memory_transport();
492        self
493    }
494
495    /// Set file desc limit
496    pub fn fd_limit(mut self, limit: FDLimit) -> Self {
497        let limit = match limit {
498            FDLimit::Max => FileDescLimit::Max,
499            FDLimit::Custom(n) => FileDescLimit::Custom(n),
500        };
501        self.init = self.init.set_file_descriptor_limit(limit);
502        self
503    }
504
505    /// Set tracing span
506    pub fn set_span(mut self, span: Span) -> Self {
507        self.options.span = Some(span);
508        self
509    }
510
511    /// Handle libp2p swarm events
512    pub fn swarm_events<F>(mut self, func: F) -> Self
513    where
514        F: Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send + 'static,
515    {
516        self.swarm_event = Some(Arc::new(func));
517        self
518    }
519
520    /// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync.
521    pub async fn start(self) -> Result<Ipfs, Error> {
522        let IpfsBuilder {
523            mut options,
524            record_key_validator,
525            repo_handle,
526            gc_config,
527            init,
528            custom_behaviour,
529            swarm_event,
530            ..
531        } = self;
532
533        let root_span = Option::take(&mut options.span)
534            // not sure what would be the best practice with tracing and spans
535            .unwrap_or_else(|| tracing::trace_span!(parent: &Span::current(), "ipfs"));
536
537        // the "current" span which is not entered but the awaited futures are instrumented with it
538        let init_span = tracing::trace_span!(parent: &root_span, "init");
539
540        // stored in the Ipfs, instrumenting every method call
541        let facade_span = tracing::trace_span!("facade");
542
543        // stored in the executor given to libp2p, used to spawn at least the connections,
544        // instrumenting each of those.
545        // let exec_span = tracing::trace_span!(parent: &root_span, "exec");
546        //
547        // // instruments the IpfsFuture, the background task.
548        // let swarm_span = tracing::trace_span!(parent: &root_span, "swarm");
549
550        let mut repo = repo_handle;
551
552        if repo.is_online() {
553            anyhow::bail!("Repo is already initialized");
554        }
555
556        #[cfg(not(target_arch = "wasm32"))]
557        {
558            repo = match &options.ipfs_path {
559                Some(path) => {
560                    if !path.is_dir() {
561                        tokio::fs::create_dir_all(path).await?;
562                    }
563                    Repo::<DefaultStorage>::new_fs(path)
564                }
565                None => repo,
566            };
567        }
568
569        #[cfg(target_arch = "wasm32")]
570        {
571            repo = match options.namespace.take() {
572                Some(ns) => Repo::<DefaultStorage>::new_idb(ns),
573                None => repo,
574            };
575        }
576
577        repo.init().instrument(init_span.clone()).await?;
578
579        let repo_events = repo.initialize_channel();
580
581        let keystore = options.keystore.clone();
582
583        //Note: If `All` or `Pinned` are used, we would have to auto adjust the amount of
584        //      provider records by adding the amount of blocks to the config.
585        //TODO: Add persistent layer for kad store
586        let blocks = match options.provider {
587            RepoProvider::None => vec![],
588            RepoProvider::All => repo.list_blocks().await.collect::<Vec<_>>().await,
589            RepoProvider::Pinned => {
590                repo.list_pins(None)
591                    .await
592                    .filter_map(|result| futures::future::ready(result.map(|(cid, _)| cid).ok()))
593                    .collect()
594                    .await
595            }
596            RepoProvider::Roots => {
597                //TODO: Scan blockstore for root unixfs blocks
598                warn!("RepoProvider::Roots is not implemented... ignoring...");
599                vec![]
600            }
601        };
602
603        // TODO: use to calculate store records limits when it is implemented in connexa
604        let _count = blocks.len();
605
606        let listening_addrs = options.listening_addrs.clone();
607
608        let gc_handle = gc_config.map(|config| {
609            async_rt::task::spawn_abortable({
610                let repo = Repo::clone(&repo);
611                async move {
612                    let GCConfig { duration, trigger } = config;
613                    let use_config_timer = duration != Duration::ZERO;
614                    if trigger == GCTrigger::None && !use_config_timer {
615                        tracing::warn!("GC does not have a set timer or a trigger. Disabling GC");
616                        return;
617                    }
618
619                    let time = match use_config_timer {
620                        true => duration,
621                        false => Duration::from_secs(60 * 60),
622                    };
623
624                    let mut interval = futures_timer::Delay::new(time);
625
626                    loop {
627                        tokio::select! {
628                            _ = &mut interval => {
629                                let _g = repo.inner.gclock.write().await;
630                                tracing::debug!("preparing gc operation");
631                                let pinned = repo
632                                    .list_pins(None)
633                                    .await
634                                    .try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
635                                    .try_collect::<BTreeSet<_>>()
636                                    .await
637                                    .unwrap_or_default();
638                                let pinned = Vec::from_iter(pinned);
639                                let total_size = repo.get_total_size().await.unwrap_or_default();
640                                let pinned_size = repo
641                                    .get_blocks_size(&pinned)
642                                    .await
643                                    .ok()
644                                    .flatten()
645                                    .unwrap_or_default();
646
647                                let unpinned_blocks = total_size - pinned_size;
648
649                                tracing::debug!(total_size = %total_size, ?trigger, unpinned_blocks);
650
651                                let cleanup = match trigger {
652                                    GCTrigger::At { size } => {
653                                        total_size > 0 && unpinned_blocks >= size
654                                    }
655                                    GCTrigger::AtStorage => {
656                                        unpinned_blocks > 0
657                                            && unpinned_blocks >= repo.max_storage_size()
658                                    }
659                                    GCTrigger::None => unpinned_blocks > 0,
660                                };
661
662                                tracing::debug!(will_run = %cleanup);
663
664                                if cleanup {
665                                    tracing::debug!("running cleanup of unpinned blocks");
666                                    let blocks = repo.cleanup().await.unwrap();
667                                    tracing::debug!(removed_blocks = blocks.len(), "blocks removed");
668                                    tracing::debug!("cleanup finished");
669                                }
670
671                                interval.reset(time);
672                            }
673                        }
674                    }
675                }
676            })
677        }).unwrap_or(AbortableJoinHandle::empty());
678
679        let mut context = context::IpfsContext::new(&repo);
680        context.repo_events.replace(repo_events);
681
682        let connexa = init
683            .with_custom_behaviour_with_context(
684                (options, repo.clone()),
685                |keys, (options, repo)| {
686                    let custom_behaviour = match custom_behaviour {
687                        Some(custom_behaviour) => Some(custom_behaviour(keys)?),
688                        None => None,
689                    };
690                    Ok(create_create_behaviour(
691                        keys,
692                        &options,
693                        &repo,
694                        custom_behaviour,
695                    ))
696                },
697            )?
698            .set_context(context)
699            .set_custom_task_callback(|swarm, context, event| context.handle_event(swarm, event))
700            .set_swarm_event_callback(move |swarm, event, context| {
701                if let Some(callback) = swarm_event.as_ref() {
702                    callback(swarm, event);
703                }
704                if let SwarmEvent::Behaviour(connexa::behaviour::BehaviourEvent::Identify(event)) =
705                    event
706                {
707                    match event {
708                        Event::Received { info, .. } => {
709                            let peer_id = info.public_key.to_peer_id();
710                            if let Some(chs) = context.find_peer_identify.remove(&peer_id) {
711                                for ch in chs {
712                                    let _ = ch.send(Ok(info.clone()));
713                                }
714                            }
715                        }
716                        Event::Sent { .. } => {}
717                        Event::Pushed { .. } => {}
718                        Event::Error { .. } => {}
719                    }
720                }
721            })
722            .set_pollable_callback(|cx, swarm, context| {
723                let custom = swarm
724                    .behaviour_mut()
725                    .custom
726                    .as_mut()
727                    .expect("behaviour enabled");
728                while let Poll::Ready(Some(event)) = context.repo_events.poll_next_unpin(cx) {
729                    context.handle_repo_event(custom, event);
730                }
731                Poll::Pending
732            })
733            .set_preload(|_, swarm, _| {
734                for addr in listening_addrs {
735                    if let Err(e) = swarm.listen_on(addr.clone()) {
736                        tracing::error!(%addr, %e, "failed to listen on address");
737                    }
738                }
739
740                for block in blocks {
741                    if let Some(kad) = swarm.behaviour_mut().kademlia.as_mut() {
742                        let key = RecordKey::from(block.hash().to_bytes());
743                        if let Err(e) = kad.start_providing(key) {
744                            match e {
745                                connexa::prelude::dht::store::Error::MaxProvidedKeys => break,
746                                _ => unreachable!(),
747                            }
748                        }
749                    }
750                }
751            })
752            .build()?;
753
754        let ipfs = Ipfs {
755            span: facade_span,
756            repo,
757            keystore,
758            connexa,
759            record_key_validator: Arc::new(record_key_validator),
760            _gc_guard: gc_handle,
761        };
762
763        Ok(ipfs)
764    }
765}