1use crate::context::IpfsContext;
2use crate::keystore::Keystore;
3use crate::p2p::{
4 create_create_behaviour, AddressBookConfig, IdentifyConfiguration, PubsubConfig, RelayConfig,
5 TSwarm,
6};
7use crate::repo::{DefaultStorage, GCConfig, GCTrigger, Repo};
8use crate::{
9 context, ipns_to_dht_key, p2p, to_dht_key, ConnectionLimits, FDLimit, Ipfs, IpfsEvent,
10 IpfsOptions, Keypair, Multiaddr, NetworkBehaviour, RecordKey, RepoProvider, TSwarmEvent,
11 TSwarmEventFn,
12};
13use anyhow::Error;
14use async_rt::AbortableJoinHandle;
15use connexa::behaviour::peer_store::store::memory::MemoryStore;
16use connexa::behaviour::request_response::RequestResponseConfig;
17use connexa::builder::{ConnexaBuilder, FileDescLimit, IntoKeypair};
18use connexa::dummy;
19use connexa::prelude::identify::Event;
20use connexa::prelude::swarm::SwarmEvent;
21#[cfg(not(target_arch = "wasm32"))]
22#[cfg(feature = "pnet")]
23use connexa::prelude::transport::pnet::PreSharedKey;
24use connexa::prelude::{gossipsub, ping, swarm};
25use futures::{StreamExt, TryStreamExt};
26use std::collections::{BTreeSet, HashMap};
27use std::convert::Infallible;
28use std::sync::Arc;
29use std::task::Poll;
30use std::time::Duration;
31use tracing::Span;
32use tracing_futures::Instrument;
33
34#[allow(clippy::type_complexity)]
36pub struct IpfsBuilder<C: NetworkBehaviour<ToSwarm = Infallible> + Send + Sync + 'static> {
37 init: ConnexaBuilder<p2p::Behaviour<C>, IpfsContext, IpfsEvent, MemoryStore>,
38 options: IpfsOptions,
39 repo_handle: Repo<DefaultStorage>,
40 swarm_event: Option<TSwarmEventFn<C>>,
41 record_key_validator:
42 HashMap<String, Box<dyn Fn(&str) -> anyhow::Result<RecordKey> + Sync + Send>>,
43 gc_config: Option<GCConfig>,
44 custom_behaviour: Option<Box<dyn FnOnce(&Keypair) -> std::io::Result<C>>>,
45 gc_repo_duration: Option<Duration>,
46}
47
48pub type DefaultIpfsBuilder = IpfsBuilder<dummy::Behaviour>;
49
50#[deprecated(note = "Use IpfsBuilder instead")]
51pub type UninitializedIpfs<T> = IpfsBuilder<T>;
52
53#[deprecated(note = "Use DefaultIpfsBuilder instead")]
54pub type UninitializedIpfsDefault = DefaultIpfsBuilder;
55
56impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send + Sync + 'static> Default for IpfsBuilder<C> {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send + Sync + 'static> IpfsBuilder<C> {
63 pub fn new() -> Self {
65 let keypair = Keypair::generate_ed25519();
66 Self::with_keypair(&keypair).expect("keypair is valid")
67 }
68
69 pub fn with_keypair(keypair: impl IntoKeypair) -> std::io::Result<Self> {
71 Ok(Self {
72 init: ConnexaBuilder::with_existing_identity(keypair)?,
73 options: Default::default(),
74 repo_handle: Repo::new_memory(),
75 record_key_validator: Default::default(),
77 swarm_event: None,
78 gc_config: None,
79 gc_repo_duration: None,
80 custom_behaviour: None,
81 })
82 }
83
84 pub fn set_default_listener(self) -> Self {
87 self.add_listening_addrs(vec![
88 "/ip4/0.0.0.0/tcp/0".parse().unwrap(),
89 "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
90 ])
91 }
92
93 pub fn add_listening_addr(mut self, addr: Multiaddr) -> Self {
95 if !self.options.listening_addrs.contains(&addr) {
96 self.options.listening_addrs.push(addr)
97 }
98 self
99 }
100
101 pub fn set_connection_limits<F>(mut self, f: F) -> Self
103 where
104 F: Fn(ConnectionLimits) -> ConnectionLimits + Send + Sync + 'static,
105 {
106 self.init = self.init.with_connection_limits_with_config(f);
107 self
108 }
109
110 pub fn add_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
112 self.options.listening_addrs.extend(addrs);
113 self
114 }
115
116 pub fn set_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
118 self.options.listening_addrs = addrs;
119 self
120 }
121
122 pub fn add_bootstrap(mut self, addr: Multiaddr) -> Self {
124 if !self.options.bootstrap.contains(&addr) {
125 self.options.bootstrap.push(addr)
126 }
127 self
128 }
129
130 pub fn with_default(self) -> Self {
132 self.with_identify(Default::default())
133 .with_autonat()
134 .with_bitswap()
135 .with_kademlia()
136 .with_ping(Default::default())
137 .with_pubsub(Default::default())
138 }
139
140 pub fn with_kademlia(mut self) -> Self {
142 self.init = self.init.with_kademlia();
143 self
144 }
145
146 pub fn with_bitswap(mut self) -> Self {
148 self.options.protocols.bitswap = true;
149 self
150 }
151
152 #[cfg(not(target_arch = "wasm32"))]
154 pub fn with_mdns(mut self) -> Self {
155 self.init = self.init.with_mdns();
156 self
157 }
158
159 pub fn with_relay(mut self, with_dcutr: bool) -> Self {
161 self.options.protocols.relay = true;
162 self.init = self.init.with_relay();
163 if with_dcutr {
164 #[cfg(not(target_arch = "wasm32"))]
165 {
166 self.init = self.init.with_dcutr();
167 }
168 }
169 self
170 }
171
172 pub fn with_relay_server(mut self, config: RelayConfig) -> Self {
174 self.init = self
175 .init
176 .with_relay_server_with_config(move |_| config.into());
177 self
178 }
179
180 #[cfg(not(target_arch = "wasm32"))]
182 pub fn with_upnp(mut self) -> Self {
183 self.init = self.init.with_upnp();
184 self
185 }
186
187 pub fn with_rendezvous_server(mut self) -> Self {
189 self.init = self.init.with_rendezvous_server();
190 self
191 }
192
193 pub fn with_rendezvous_client(mut self) -> Self {
195 self.init = self.init.with_rendezvous_client();
196 self
197 }
198
199 pub fn with_identify(mut self, config: IdentifyConfiguration) -> Self {
201 self.init = self
202 .init
203 .with_identify_with_config(config.protocol_version, move |cfg| {
204 cfg.with_agent_version(config.agent_version)
205 .with_interval(config.interval)
206 .with_push_listen_addr_updates(config.push_update)
207 .with_cache_size(config.cache)
208 });
209 self
210 }
211
212 #[cfg(feature = "stream")]
213 pub fn with_streams(mut self) -> Self {
214 self.init = self.init.with_streams();
215 self
216 }
217
218 pub fn with_pubsub(mut self, config: PubsubConfig) -> Self {
220 self.init = self
221 .init
222 .with_gossipsub_with_config(move |keypair, mut builder| {
223 if let Some(protocol) = config.custom_protocol_id {
224 builder.protocol_id(protocol, gossipsub::Version::V1_1);
225 }
226
227 builder.max_transmit_size(config.max_transmit_size);
228
229 if config.floodsub_compat {
230 builder.support_floodsub();
231 }
232
233 builder.validation_mode(config.validate.into());
234 let auth =
235 connexa::prelude::gossipsub::MessageAuthenticity::Signed(keypair.clone());
236 (builder, auth)
237 });
238 self
239 }
240
241 pub fn with_request_response(mut self, config: Vec<RequestResponseConfig>) -> Self {
246 self.init = self.init.with_request_response(config);
247
248 self
249 }
250
251 pub fn with_autonat(mut self) -> Self {
253 self.init = self.init.with_autonat_v1();
254 self
255 }
256
257 pub fn with_ping(mut self, config: ping::Config) -> Self {
259 self.init = self.init.with_ping_with_config(move |_| config);
260 self
261 }
262
263 pub fn with_custom_behaviour<F>(mut self, f: F) -> Self
265 where
266 F: FnOnce(&Keypair) -> std::io::Result<C> + 'static,
267 {
268 self.custom_behaviour.replace(Box::new(f));
269 self
270 }
271
272 pub fn with_gc(mut self, config: GCConfig) -> Self {
274 self.gc_config = Some(config);
275 self
276 }
277
278 pub fn set_temp_pin_duration(mut self, duration: Duration) -> Self {
281 self.gc_repo_duration = Some(duration);
282 self
283 }
284
285 #[cfg(not(target_arch = "wasm32"))]
287 pub fn set_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
288 let path = path.as_ref().to_path_buf();
289 self.options.ipfs_path = Some(path);
290 self
291 }
292
293 #[cfg(target_arch = "wasm32")]
295 pub fn set_namespace(mut self, ns: Option<String>) -> Self {
296 self.options.namespace = Some(ns);
297 self
298 }
299
300 pub fn set_idle_connection_timeout(mut self, duration: u64) -> Self {
302 self.init = self.init.set_swarm_config(move |swarm| {
303 swarm.with_idle_connection_timeout(Duration::from_secs(duration))
304 });
305 self
306 }
307
308 pub fn set_swarm_configuration<F>(mut self, f: F) -> Self
310 where
311 F: FnOnce(swarm::Config) -> swarm::Config + Send + Sync + 'static,
312 {
313 self.init = self.init.set_swarm_config(f);
314 self
315 }
316
317 pub fn default_record_key_validator(mut self) -> Self {
320 self.record_key_validator.insert(
321 "ipns".into(),
322 Box::new(|key| to_dht_key(("ipns", |key| ipns_to_dht_key(key)), key)),
323 );
324 self
325 }
326
327 pub fn set_record_prefix_validator<F>(mut self, key: &str, callback: F) -> Self
328 where
329 F: Fn(&str) -> anyhow::Result<RecordKey> + Sync + Send + 'static,
330 {
331 self.record_key_validator
332 .insert(key.to_string(), Box::new(callback));
333 self
334 }
335
336 pub fn set_addrbook_configuration(mut self, config: AddressBookConfig) -> Self {
338 self.options.addr_config = config;
339 self
340 }
341
342 pub fn set_provider(mut self, opt: RepoProvider) -> Self {
344 self.options.provider = opt;
345 self
346 }
347
348 pub fn set_repo(mut self, repo: &Repo<DefaultStorage>) -> Self {
350 self.repo_handle = Repo::clone(repo);
351 self
352 }
353
354 pub fn set_keystore(mut self, keystore: &Keystore) -> Self {
356 self.options.keystore = keystore.clone();
357 self
358 }
359
360 #[cfg(feature = "quic")]
362 #[cfg(not(target_arch = "wasm32"))]
363 pub fn enable_quic(mut self) -> Self {
364 self.init = self.init.enable_quic();
365 self
366 }
367
368 #[cfg(feature = "quic")]
370 #[cfg(not(target_arch = "wasm32"))]
371 pub fn enable_quic_with_config<F>(mut self, f: F) -> Self
372 where
373 F: FnOnce(
374 connexa::prelude::transport::quic::Config,
375 ) -> connexa::prelude::transport::quic::Config
376 + 'static,
377 {
378 self.init = self.init.enable_quic_with_config(f);
379 self
380 }
381
382 #[cfg(feature = "tcp")]
384 #[cfg(not(target_arch = "wasm32"))]
385 pub fn enable_tcp(mut self) -> Self {
386 self.init = self.init.enable_tcp();
387 self
388 }
389
390 #[cfg(feature = "tcp")]
392 #[cfg(not(target_arch = "wasm32"))]
393 pub fn enable_tcp_with_config<F>(mut self, f: F) -> Self
394 where
395 F: FnOnce(
396 connexa::prelude::transport::tcp::Config,
397 ) -> connexa::prelude::transport::tcp::Config
398 + 'static,
399 {
400 self.init = self.init.enable_tcp_with_config(f);
401 self
402 }
403
404 #[cfg(feature = "pnet")]
406 #[cfg(not(target_arch = "wasm32"))]
407 pub fn enable_pnet(mut self, psk: PreSharedKey) -> Self {
408 self.init = self.init.enable_pnet(psk);
409 self
410 }
411
412 #[cfg(feature = "websocket")]
414 pub fn enable_websocket(mut self) -> Self {
415 self.init = self.init.enable_websocket();
416 self
417 }
418
419 #[cfg(feature = "websocket")]
421 #[cfg(not(target_arch = "wasm32"))]
422 pub fn enable_secure_websocket(mut self) -> Self {
423 self.init = self.init.enable_secure_websocket();
424 self
425 }
426
427 #[cfg(feature = "websocket")]
429 #[cfg(not(target_arch = "wasm32"))]
430 pub fn enable_secure_websocket_with_pem(mut self, keypair: String, certs: Vec<String>) -> Self {
431 self.init = self.init.enable_secure_websocket_with_pem(keypair, certs);
432 self
433 }
434
435 #[cfg(feature = "websocket")]
437 #[cfg(not(target_arch = "wasm32"))]
438 pub fn enable_secure_websocket_with_config<F>(mut self, f: F) -> std::io::Result<Self>
439 where
440 F: FnOnce(&Keypair) -> std::io::Result<(Vec<String>, String)>,
441 {
442 self.init = self.init.enable_secure_websocket_with_config(f)?;
443 Ok(self)
444 }
445
446 #[cfg(feature = "dns")]
448 pub fn enable_dns(self) -> Self {
449 self.enable_dns_with_resolver(connexa::prelude::transport::dns::DnsResolver::default())
450 }
451
452 #[cfg(feature = "dns")]
454 pub fn enable_dns_with_resolver(
455 mut self,
456 resolver: connexa::prelude::transport::dns::DnsResolver,
457 ) -> Self {
458 self.init = self.init.enable_dns_with_resolver(resolver);
459 self
460 }
461
462 #[cfg(feature = "webrtc")]
464 pub fn enable_webrtc(mut self) -> Self {
465 self.init = self.init.enable_webrtc();
466 self
467 }
468
469 #[cfg(feature = "webrtc")]
471 #[cfg(not(target_arch = "wasm32"))]
472 pub fn enable_webrtc_with_config<F>(mut self, f: F) -> std::io::Result<Self>
473 where
474 F: FnOnce(&Keypair) -> std::io::Result<String>,
475 {
476 self.init = self.init.enable_webrtc_with_config(f)?;
477 Ok(self)
478 }
479
480 #[cfg(feature = "webrtc")]
482 #[cfg(not(target_arch = "wasm32"))]
483 pub fn enable_webrtc_with_pem(self, pem: impl Into<String>) -> Self {
484 let pem = pem.into();
485 self.enable_webrtc_with_config(move |_| Ok(pem))
486 .expect("pem is provided; should not fail")
487 }
488
489 pub fn enable_memory_transport(mut self) -> Self {
491 self.init = self.init.enable_memory_transport();
492 self
493 }
494
495 pub fn fd_limit(mut self, limit: FDLimit) -> Self {
497 let limit = match limit {
498 FDLimit::Max => FileDescLimit::Max,
499 FDLimit::Custom(n) => FileDescLimit::Custom(n),
500 };
501 self.init = self.init.set_file_descriptor_limit(limit);
502 self
503 }
504
505 pub fn set_span(mut self, span: Span) -> Self {
507 self.options.span = Some(span);
508 self
509 }
510
511 pub fn swarm_events<F>(mut self, func: F) -> Self
513 where
514 F: Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send + 'static,
515 {
516 self.swarm_event = Some(Arc::new(func));
517 self
518 }
519
520 pub async fn start(self) -> Result<Ipfs, Error> {
522 let IpfsBuilder {
523 mut options,
524 record_key_validator,
525 repo_handle,
526 gc_config,
527 init,
528 custom_behaviour,
529 swarm_event,
530 ..
531 } = self;
532
533 let root_span = Option::take(&mut options.span)
534 .unwrap_or_else(|| tracing::trace_span!(parent: &Span::current(), "ipfs"));
536
537 let init_span = tracing::trace_span!(parent: &root_span, "init");
539
540 let facade_span = tracing::trace_span!("facade");
542
543 let mut repo = repo_handle;
551
552 if repo.is_online() {
553 anyhow::bail!("Repo is already initialized");
554 }
555
556 #[cfg(not(target_arch = "wasm32"))]
557 {
558 repo = match &options.ipfs_path {
559 Some(path) => {
560 if !path.is_dir() {
561 tokio::fs::create_dir_all(path).await?;
562 }
563 Repo::<DefaultStorage>::new_fs(path)
564 }
565 None => repo,
566 };
567 }
568
569 #[cfg(target_arch = "wasm32")]
570 {
571 repo = match options.namespace.take() {
572 Some(ns) => Repo::<DefaultStorage>::new_idb(ns),
573 None => repo,
574 };
575 }
576
577 repo.init().instrument(init_span.clone()).await?;
578
579 let repo_events = repo.initialize_channel();
580
581 let keystore = options.keystore.clone();
582
583 let blocks = match options.provider {
587 RepoProvider::None => vec![],
588 RepoProvider::All => repo.list_blocks().await.collect::<Vec<_>>().await,
589 RepoProvider::Pinned => {
590 repo.list_pins(None)
591 .await
592 .filter_map(|result| futures::future::ready(result.map(|(cid, _)| cid).ok()))
593 .collect()
594 .await
595 }
596 RepoProvider::Roots => {
597 warn!("RepoProvider::Roots is not implemented... ignoring...");
599 vec![]
600 }
601 };
602
603 let _count = blocks.len();
605
606 let listening_addrs = options.listening_addrs.clone();
607
608 let gc_handle = gc_config.map(|config| {
609 async_rt::task::spawn_abortable({
610 let repo = Repo::clone(&repo);
611 async move {
612 let GCConfig { duration, trigger } = config;
613 let use_config_timer = duration != Duration::ZERO;
614 if trigger == GCTrigger::None && !use_config_timer {
615 tracing::warn!("GC does not have a set timer or a trigger. Disabling GC");
616 return;
617 }
618
619 let time = match use_config_timer {
620 true => duration,
621 false => Duration::from_secs(60 * 60),
622 };
623
624 let mut interval = futures_timer::Delay::new(time);
625
626 loop {
627 tokio::select! {
628 _ = &mut interval => {
629 let _g = repo.inner.gclock.write().await;
630 tracing::debug!("preparing gc operation");
631 let pinned = repo
632 .list_pins(None)
633 .await
634 .try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
635 .try_collect::<BTreeSet<_>>()
636 .await
637 .unwrap_or_default();
638 let pinned = Vec::from_iter(pinned);
639 let total_size = repo.get_total_size().await.unwrap_or_default();
640 let pinned_size = repo
641 .get_blocks_size(&pinned)
642 .await
643 .ok()
644 .flatten()
645 .unwrap_or_default();
646
647 let unpinned_blocks = total_size - pinned_size;
648
649 tracing::debug!(total_size = %total_size, ?trigger, unpinned_blocks);
650
651 let cleanup = match trigger {
652 GCTrigger::At { size } => {
653 total_size > 0 && unpinned_blocks >= size
654 }
655 GCTrigger::AtStorage => {
656 unpinned_blocks > 0
657 && unpinned_blocks >= repo.max_storage_size()
658 }
659 GCTrigger::None => unpinned_blocks > 0,
660 };
661
662 tracing::debug!(will_run = %cleanup);
663
664 if cleanup {
665 tracing::debug!("running cleanup of unpinned blocks");
666 let blocks = repo.cleanup().await.unwrap();
667 tracing::debug!(removed_blocks = blocks.len(), "blocks removed");
668 tracing::debug!("cleanup finished");
669 }
670
671 interval.reset(time);
672 }
673 }
674 }
675 }
676 })
677 }).unwrap_or(AbortableJoinHandle::empty());
678
679 let mut context = context::IpfsContext::new(&repo);
680 context.repo_events.replace(repo_events);
681
682 let connexa = init
683 .with_custom_behaviour_with_context(
684 (options, repo.clone()),
685 |keys, (options, repo)| {
686 let custom_behaviour = match custom_behaviour {
687 Some(custom_behaviour) => Some(custom_behaviour(keys)?),
688 None => None,
689 };
690 Ok(create_create_behaviour(
691 keys,
692 &options,
693 &repo,
694 custom_behaviour,
695 ))
696 },
697 )?
698 .set_context(context)
699 .set_custom_task_callback(|swarm, context, event| context.handle_event(swarm, event))
700 .set_swarm_event_callback(move |swarm, event, context| {
701 if let Some(callback) = swarm_event.as_ref() {
702 callback(swarm, event);
703 }
704 if let SwarmEvent::Behaviour(connexa::behaviour::BehaviourEvent::Identify(event)) =
705 event
706 {
707 match event {
708 Event::Received { info, .. } => {
709 let peer_id = info.public_key.to_peer_id();
710 if let Some(chs) = context.find_peer_identify.remove(&peer_id) {
711 for ch in chs {
712 let _ = ch.send(Ok(info.clone()));
713 }
714 }
715 }
716 Event::Sent { .. } => {}
717 Event::Pushed { .. } => {}
718 Event::Error { .. } => {}
719 }
720 }
721 })
722 .set_pollable_callback(|cx, swarm, context| {
723 let custom = swarm
724 .behaviour_mut()
725 .custom
726 .as_mut()
727 .expect("behaviour enabled");
728 while let Poll::Ready(Some(event)) = context.repo_events.poll_next_unpin(cx) {
729 context.handle_repo_event(custom, event);
730 }
731 Poll::Pending
732 })
733 .set_preload(|_, swarm, _| {
734 for addr in listening_addrs {
735 if let Err(e) = swarm.listen_on(addr.clone()) {
736 tracing::error!(%addr, %e, "failed to listen on address");
737 }
738 }
739
740 for block in blocks {
741 if let Some(kad) = swarm.behaviour_mut().kademlia.as_mut() {
742 let key = RecordKey::from(block.hash().to_bytes());
743 if let Err(e) = kad.start_providing(key) {
744 match e {
745 connexa::prelude::dht::store::Error::MaxProvidedKeys => break,
746 _ => unreachable!(),
747 }
748 }
749 }
750 }
751 })
752 .build()?;
753
754 let ipfs = Ipfs {
755 span: facade_span,
756 repo,
757 keystore,
758 connexa,
759 record_key_validator: Arc::new(record_key_validator),
760 _gc_guard: gc_handle,
761 };
762
763 Ok(ipfs)
764 }
765}