use crate::context::IpfsContext;
use crate::keystore::Keystore;
use crate::p2p::{
create_create_behaviour, AddressBookConfig, IdentifyConfiguration, PubsubConfig, RelayConfig,
TSwarm,
};
use crate::repo::{DefaultStorage, GCConfig, GCTrigger, Repo};
use crate::{
context, ipns_to_dht_key, p2p, to_dht_key, ConnectionLimits, FDLimit, Ipfs, IpfsEvent,
IpfsOptions, Keypair, Multiaddr, NetworkBehaviour, RecordKey, RepoProvider, TSwarmEvent,
TSwarmEventFn,
};
use anyhow::Error;
use async_rt::AbortableJoinHandle;
use connexa::behaviour::peer_store::store::memory::MemoryStore;
use connexa::behaviour::request_response::RequestResponseConfig;
use connexa::builder::{ConnexaBuilder, FileDescLimit, IntoKeypair};
use connexa::dummy;
use connexa::prelude::identify::Event;
use connexa::prelude::swarm::SwarmEvent;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "pnet")]
use connexa::prelude::transport::pnet::PreSharedKey;
use connexa::prelude::{gossipsub, ping, swarm};
use futures::{StreamExt, TryStreamExt};
use std::collections::{BTreeSet, HashMap};
use std::convert::Infallible;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use tracing::Span;
use tracing_futures::Instrument;
#[allow(clippy::type_complexity)]
pub struct IpfsBuilder<C: NetworkBehaviour<ToSwarm = Infallible> + Send + Sync + 'static> {
init: ConnexaBuilder<p2p::Behaviour<C>, IpfsContext, IpfsEvent, MemoryStore>,
options: IpfsOptions,
repo_handle: Repo<DefaultStorage>,
swarm_event: Option<TSwarmEventFn<C>>,
record_key_validator:
HashMap<String, Box<dyn Fn(&str) -> anyhow::Result<RecordKey> + Sync + Send>>,
gc_config: Option<GCConfig>,
custom_behaviour: Option<Box<dyn FnOnce(&Keypair) -> std::io::Result<C>>>,
gc_repo_duration: Option<Duration>,
}
pub type DefaultIpfsBuilder = IpfsBuilder<dummy::Behaviour>;
#[deprecated(note = "Use IpfsBuilder instead")]
pub type UninitializedIpfs<T> = IpfsBuilder<T>;
#[deprecated(note = "Use DefaultIpfsBuilder instead")]
pub type UninitializedIpfsDefault = DefaultIpfsBuilder;
impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send + Sync + 'static> Default for IpfsBuilder<C> {
fn default() -> Self {
Self::new()
}
}
impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send + Sync + 'static> IpfsBuilder<C> {
pub fn new() -> Self {
let keypair = Keypair::generate_ed25519();
Self::with_keypair(&keypair).expect("keypair is valid")
}
pub fn with_keypair(keypair: impl IntoKeypair) -> std::io::Result<Self> {
Ok(Self {
init: ConnexaBuilder::with_existing_identity(keypair)?,
options: Default::default(),
repo_handle: Repo::new_memory(),
record_key_validator: Default::default(),
swarm_event: None,
gc_config: None,
gc_repo_duration: None,
custom_behaviour: None,
})
}
pub fn set_default_listener(self) -> Self {
self.add_listening_addrs(vec![
"/ip4/0.0.0.0/tcp/0".parse().unwrap(),
"/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
])
}
pub fn add_listening_addr(mut self, addr: Multiaddr) -> Self {
if !self.options.listening_addrs.contains(&addr) {
self.options.listening_addrs.push(addr)
}
self
}
pub fn set_connection_limits<F>(mut self, f: F) -> Self
where
F: Fn(ConnectionLimits) -> ConnectionLimits + Send + Sync + 'static,
{
self.init = self.init.with_connection_limits_with_config(f);
self
}
pub fn add_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
self.options.listening_addrs.extend(addrs);
self
}
pub fn set_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
self.options.listening_addrs = addrs;
self
}
pub fn add_bootstrap(mut self, addr: Multiaddr) -> Self {
if !self.options.bootstrap.contains(&addr) {
self.options.bootstrap.push(addr)
}
self
}
pub fn with_default(self) -> Self {
self.with_identify(Default::default())
.with_autonat()
.with_bitswap()
.with_kademlia()
.with_ping(Default::default())
.with_pubsub(Default::default())
}
pub fn with_kademlia(mut self) -> Self {
self.init = self.init.with_kademlia();
self
}
pub fn with_bitswap(mut self) -> Self {
self.options.protocols.bitswap = true;
self
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_mdns(mut self) -> Self {
self.init = self.init.with_mdns();
self
}
pub fn with_relay(mut self, with_dcutr: bool) -> Self {
self.options.protocols.relay = true;
self.init = self.init.with_relay();
if with_dcutr {
#[cfg(not(target_arch = "wasm32"))]
{
self.init = self.init.with_dcutr();
}
}
self
}
pub fn with_relay_server(mut self, config: RelayConfig) -> Self {
self.init = self
.init
.with_relay_server_with_config(move |_| config.into());
self
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_upnp(mut self) -> Self {
self.init = self.init.with_upnp();
self
}
pub fn with_rendezvous_server(mut self) -> Self {
self.init = self.init.with_rendezvous_server();
self
}
pub fn with_rendezvous_client(mut self) -> Self {
self.init = self.init.with_rendezvous_client();
self
}
pub fn with_identify(mut self, config: IdentifyConfiguration) -> Self {
self.init = self
.init
.with_identify_with_config(config.protocol_version, move |cfg| {
cfg.with_agent_version(config.agent_version)
.with_interval(config.interval)
.with_push_listen_addr_updates(config.push_update)
.with_cache_size(config.cache)
});
self
}
#[cfg(feature = "stream")]
pub fn with_streams(mut self) -> Self {
self.init = self.init.with_streams();
self
}
pub fn with_pubsub(mut self, config: PubsubConfig) -> Self {
self.init = self
.init
.with_gossipsub_with_config(move |keypair, mut builder| {
if let Some(protocol) = config.custom_protocol_id {
builder.protocol_id(protocol, gossipsub::Version::V1_1);
}
builder.max_transmit_size(config.max_transmit_size);
if config.floodsub_compat {
builder.support_floodsub();
}
builder.validation_mode(config.validate.into());
let auth =
connexa::prelude::gossipsub::MessageAuthenticity::Signed(keypair.clone());
(builder, auth)
});
self
}
pub fn with_request_response(mut self, config: Vec<RequestResponseConfig>) -> Self {
self.init = self.init.with_request_response(config);
self
}
pub fn with_autonat(mut self) -> Self {
self.init = self.init.with_autonat_v1();
self
}
pub fn with_ping(mut self, config: ping::Config) -> Self {
self.init = self.init.with_ping_with_config(move |_| config);
self
}
pub fn with_custom_behaviour<F>(mut self, f: F) -> Self
where
F: FnOnce(&Keypair) -> std::io::Result<C> + 'static,
{
self.custom_behaviour.replace(Box::new(f));
self
}
pub fn with_gc(mut self, config: GCConfig) -> Self {
self.gc_config = Some(config);
self
}
pub fn set_temp_pin_duration(mut self, duration: Duration) -> Self {
self.gc_repo_duration = Some(duration);
self
}
#[cfg(not(target_arch = "wasm32"))]
pub fn set_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
let path = path.as_ref().to_path_buf();
self.options.ipfs_path = Some(path);
self
}
#[cfg(target_arch = "wasm32")]
pub fn set_namespace(mut self, ns: Option<String>) -> Self {
self.options.namespace = Some(ns);
self
}
pub fn set_idle_connection_timeout(mut self, duration: u64) -> Self {
self.init = self.init.set_swarm_config(move |swarm| {
swarm.with_idle_connection_timeout(Duration::from_secs(duration))
});
self
}
pub fn set_swarm_configuration<F>(mut self, f: F) -> Self
where
F: FnOnce(swarm::Config) -> swarm::Config + Send + Sync + 'static,
{
self.init = self.init.set_swarm_config(f);
self
}
pub fn default_record_key_validator(mut self) -> Self {
self.record_key_validator.insert(
"ipns".into(),
Box::new(|key| to_dht_key(("ipns", |key| ipns_to_dht_key(key)), key)),
);
self
}
pub fn set_record_prefix_validator<F>(mut self, key: &str, callback: F) -> Self
where
F: Fn(&str) -> anyhow::Result<RecordKey> + Sync + Send + 'static,
{
self.record_key_validator
.insert(key.to_string(), Box::new(callback));
self
}
pub fn set_addrbook_configuration(mut self, config: AddressBookConfig) -> Self {
self.options.addr_config = config;
self
}
pub fn set_provider(mut self, opt: RepoProvider) -> Self {
self.options.provider = opt;
self
}
pub fn set_repo(mut self, repo: &Repo<DefaultStorage>) -> Self {
self.repo_handle = Repo::clone(repo);
self
}
pub fn set_keystore(mut self, keystore: &Keystore) -> Self {
self.options.keystore = keystore.clone();
self
}
#[cfg(feature = "quic")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_quic(mut self) -> Self {
self.init = self.init.enable_quic();
self
}
#[cfg(feature = "quic")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_quic_with_config<F>(mut self, f: F) -> Self
where
F: FnOnce(
connexa::prelude::transport::quic::Config,
) -> connexa::prelude::transport::quic::Config
+ 'static,
{
self.init = self.init.enable_quic_with_config(f);
self
}
#[cfg(feature = "tcp")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_tcp(mut self) -> Self {
self.init = self.init.enable_tcp();
self
}
#[cfg(feature = "tcp")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_tcp_with_config<F>(mut self, f: F) -> Self
where
F: FnOnce(
connexa::prelude::transport::tcp::Config,
) -> connexa::prelude::transport::tcp::Config
+ 'static,
{
self.init = self.init.enable_tcp_with_config(f);
self
}
#[cfg(feature = "pnet")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_pnet(mut self, psk: PreSharedKey) -> Self {
self.init = self.init.enable_pnet(psk);
self
}
#[cfg(feature = "websocket")]
pub fn enable_websocket(mut self) -> Self {
self.init = self.init.enable_websocket();
self
}
#[cfg(feature = "websocket")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_secure_websocket(mut self) -> Self {
self.init = self.init.enable_secure_websocket();
self
}
#[cfg(feature = "websocket")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_secure_websocket_with_pem(mut self, keypair: String, certs: Vec<String>) -> Self {
self.init = self.init.enable_secure_websocket_with_pem(keypair, certs);
self
}
#[cfg(feature = "websocket")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_secure_websocket_with_config<F>(mut self, f: F) -> std::io::Result<Self>
where
F: FnOnce(&Keypair) -> std::io::Result<(Vec<String>, String)>,
{
self.init = self.init.enable_secure_websocket_with_config(f)?;
Ok(self)
}
#[cfg(feature = "dns")]
pub fn enable_dns(self) -> Self {
self.enable_dns_with_resolver(connexa::prelude::transport::dns::DnsResolver::default())
}
#[cfg(feature = "dns")]
pub fn enable_dns_with_resolver(
mut self,
resolver: connexa::prelude::transport::dns::DnsResolver,
) -> Self {
self.init = self.init.enable_dns_with_resolver(resolver);
self
}
#[cfg(feature = "webrtc")]
pub fn enable_webrtc(mut self) -> Self {
self.init = self.init.enable_webrtc();
self
}
#[cfg(feature = "webrtc")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_webrtc_with_config<F>(mut self, f: F) -> std::io::Result<Self>
where
F: FnOnce(&Keypair) -> std::io::Result<String>,
{
self.init = self.init.enable_webrtc_with_config(f)?;
Ok(self)
}
#[cfg(feature = "webrtc")]
#[cfg(not(target_arch = "wasm32"))]
pub fn enable_webrtc_with_pem(self, pem: impl Into<String>) -> Self {
let pem = pem.into();
self.enable_webrtc_with_config(move |_| Ok(pem))
.expect("pem is provided; should not fail")
}
pub fn enable_memory_transport(mut self) -> Self {
self.init = self.init.enable_memory_transport();
self
}
pub fn fd_limit(mut self, limit: FDLimit) -> Self {
let limit = match limit {
FDLimit::Max => FileDescLimit::Max,
FDLimit::Custom(n) => FileDescLimit::Custom(n),
};
self.init = self.init.set_file_descriptor_limit(limit);
self
}
pub fn set_span(mut self, span: Span) -> Self {
self.options.span = Some(span);
self
}
pub fn swarm_events<F>(mut self, func: F) -> Self
where
F: Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send + 'static,
{
self.swarm_event = Some(Arc::new(func));
self
}
pub async fn start(self) -> Result<Ipfs, Error> {
let IpfsBuilder {
mut options,
record_key_validator,
repo_handle,
gc_config,
init,
custom_behaviour,
swarm_event,
..
} = self;
let root_span = Option::take(&mut options.span)
.unwrap_or_else(|| tracing::trace_span!(parent: &Span::current(), "ipfs"));
let init_span = tracing::trace_span!(parent: &root_span, "init");
let facade_span = tracing::trace_span!("facade");
let mut repo = repo_handle;
if repo.is_online() {
anyhow::bail!("Repo is already initialized");
}
#[cfg(not(target_arch = "wasm32"))]
{
repo = match &options.ipfs_path {
Some(path) => {
if !path.is_dir() {
tokio::fs::create_dir_all(path).await?;
}
Repo::<DefaultStorage>::new_fs(path)
}
None => repo,
};
}
#[cfg(target_arch = "wasm32")]
{
repo = match options.namespace.take() {
Some(ns) => Repo::<DefaultStorage>::new_idb(ns),
None => repo,
};
}
repo.init().instrument(init_span.clone()).await?;
let repo_events = repo.initialize_channel();
let keystore = options.keystore.clone();
let blocks = match options.provider {
RepoProvider::None => vec![],
RepoProvider::All => repo.list_blocks().await.collect::<Vec<_>>().await,
RepoProvider::Pinned => {
repo.list_pins(None)
.await
.filter_map(|result| futures::future::ready(result.map(|(cid, _)| cid).ok()))
.collect()
.await
}
RepoProvider::Roots => {
warn!("RepoProvider::Roots is not implemented... ignoring...");
vec![]
}
};
let _count = blocks.len();
let listening_addrs = options.listening_addrs.clone();
let gc_handle = gc_config.map(|config| {
async_rt::task::spawn_abortable({
let repo = Repo::clone(&repo);
async move {
let GCConfig { duration, trigger } = config;
let use_config_timer = duration != Duration::ZERO;
if trigger == GCTrigger::None && !use_config_timer {
tracing::warn!("GC does not have a set timer or a trigger. Disabling GC");
return;
}
let time = match use_config_timer {
true => duration,
false => Duration::from_secs(60 * 60),
};
let mut interval = futures_timer::Delay::new(time);
loop {
tokio::select! {
_ = &mut interval => {
let _g = repo.inner.gclock.write().await;
tracing::debug!("preparing gc operation");
let pinned = repo
.list_pins(None)
.await
.try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
.try_collect::<BTreeSet<_>>()
.await
.unwrap_or_default();
let pinned = Vec::from_iter(pinned);
let total_size = repo.get_total_size().await.unwrap_or_default();
let pinned_size = repo
.get_blocks_size(&pinned)
.await
.ok()
.flatten()
.unwrap_or_default();
let unpinned_blocks = total_size - pinned_size;
tracing::debug!(total_size = %total_size, ?trigger, unpinned_blocks);
let cleanup = match trigger {
GCTrigger::At { size } => {
total_size > 0 && unpinned_blocks >= size
}
GCTrigger::AtStorage => {
unpinned_blocks > 0
&& unpinned_blocks >= repo.max_storage_size()
}
GCTrigger::None => unpinned_blocks > 0,
};
tracing::debug!(will_run = %cleanup);
if cleanup {
tracing::debug!("running cleanup of unpinned blocks");
let blocks = repo.cleanup().await.unwrap();
tracing::debug!(removed_blocks = blocks.len(), "blocks removed");
tracing::debug!("cleanup finished");
}
interval.reset(time);
}
}
}
}
})
}).unwrap_or(AbortableJoinHandle::empty());
let mut context = context::IpfsContext::new(&repo);
context.repo_events.replace(repo_events);
let connexa = init
.with_custom_behaviour_with_context(
(options, repo.clone()),
|keys, (options, repo)| {
let custom_behaviour = match custom_behaviour {
Some(custom_behaviour) => Some(custom_behaviour(keys)?),
None => None,
};
Ok(create_create_behaviour(
keys,
&options,
&repo,
custom_behaviour,
))
},
)?
.set_context(context)
.set_custom_task_callback(|swarm, context, event| context.handle_event(swarm, event))
.set_swarm_event_callback(move |swarm, event, context| {
if let Some(callback) = swarm_event.as_ref() {
callback(swarm, event);
}
if let SwarmEvent::Behaviour(connexa::behaviour::BehaviourEvent::Identify(event)) =
event
{
match event {
Event::Received { info, .. } => {
let peer_id = info.public_key.to_peer_id();
if let Some(chs) = context.find_peer_identify.remove(&peer_id) {
for ch in chs {
let _ = ch.send(Ok(info.clone()));
}
}
}
Event::Sent { .. } => {}
Event::Pushed { .. } => {}
Event::Error { .. } => {}
}
}
})
.set_pollable_callback(|cx, swarm, context| {
let custom = swarm
.behaviour_mut()
.custom
.as_mut()
.expect("behaviour enabled");
while let Poll::Ready(Some(event)) = context.repo_events.poll_next_unpin(cx) {
context.handle_repo_event(custom, event);
}
Poll::Pending
})
.set_preload(|_, swarm, _| {
for addr in listening_addrs {
if let Err(e) = swarm.listen_on(addr.clone()) {
tracing::error!(%addr, %e, "failed to listen on address");
}
}
for block in blocks {
if let Some(kad) = swarm.behaviour_mut().kademlia.as_mut() {
let key = RecordKey::from(block.hash().to_bytes());
if let Err(e) = kad.start_providing(key) {
match e {
connexa::prelude::dht::store::Error::MaxProvidedKeys => break,
_ => unreachable!(),
}
}
}
}
})
.build()?;
let ipfs = Ipfs {
span: facade_span,
repo,
keystore,
connexa,
record_key_validator: Arc::new(record_key_validator),
_gc_guard: gc_handle,
};
Ok(ipfs)
}
}