pub mod config;
pub mod dag;
pub mod error;
pub mod ipns;
mod keystore;
pub mod p2p;
pub mod path;
pub mod refs;
pub mod repo;
pub(crate) mod rt;
mod task;
pub mod unixfs;
#[macro_use]
extern crate tracing;
use anyhow::{anyhow, format_err};
use bytes::Bytes;
use dag::{DagGet, DagPut};
use either::Either;
use futures::{
channel::{
mpsc::{channel, Sender, UnboundedReceiver},
oneshot::{self, channel as oneshot_channel, Sender as OneshotSender},
},
future::BoxFuture,
sink::SinkExt,
stream::{BoxStream, Stream},
FutureExt, StreamExt, TryStreamExt,
};
use keystore::Keystore;
#[cfg(feature = "beetle_bitswap")]
use p2p::BitswapConfig;
use p2p::{
IdentifyConfiguration, KadConfig, KadStoreConfig, PeerInfo, PubsubConfig, RelayConfig,
SwarmConfig, TransportConfig,
};
use repo::{
BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, RepoInsertPin, RepoRemovePin,
};
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::Span;
use tracing_futures::Instrument;
use unixfs::UnixfsGet;
use unixfs::{AddOpt, IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsLs};
use std::{
collections::{BTreeSet, HashMap, HashSet},
fmt,
ops::{Deref, DerefMut},
path::Path,
sync::atomic::AtomicU64,
sync::Arc,
time::Duration,
};
use self::{
dag::IpldDag,
ipns::Ipns,
p2p::{create_swarm, TSwarm},
repo::Repo,
};
pub use self::p2p::gossipsub::SubscriptionStream;
pub use self::{
error::Error,
p2p::BehaviourEvent,
p2p::KadResult,
path::IpfsPath,
repo::{PinKind, PinMode},
};
pub type Block = libipld::Block<libipld::DefaultParams>;
use libipld::{Cid, Ipld};
pub use libp2p::{
self,
core::transport::ListenerId,
gossipsub::{MessageId, PublishError},
identity::Keypair,
identity::PublicKey,
kad::{Quorum, RecordKey as Key},
multiaddr::multiaddr,
multiaddr::Protocol,
swarm::NetworkBehaviour,
Multiaddr, PeerId,
};
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
kad::{store::MemoryStoreConfig, Mode, Record},
ping::Config as PingConfig,
rendezvous::Namespace,
swarm::dial_opts::DialOpts,
StreamProtocol,
};
pub(crate) static BITSWAP_ID: AtomicU64 = AtomicU64::new(1);
#[allow(dead_code)]
#[deprecated(note = "Use `StoreageType` instead")]
type StoragePath = StorageType;
#[derive(Default, Debug)]
pub enum StorageType {
#[cfg(not(target_arch = "wasm32"))]
Disk(std::path::PathBuf),
#[default]
Memory,
#[cfg(target_arch = "wasm32")]
IndexedDb { namespace: Option<String> },
Custom {
blockstore: Option<Box<dyn BlockStore>>,
datastore: Option<Box<dyn DataStore>>,
lock: Option<Box<dyn Lock>>,
},
}
impl PartialEq for StorageType {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
#[cfg(not(target_arch = "wasm32"))]
(StorageType::Disk(left_path), StorageType::Disk(right_path)) => {
left_path.eq(right_path)
}
#[cfg(target_arch = "wasm32")]
(
StorageType::IndexedDb { namespace: left },
StorageType::IndexedDb { namespace: right },
) => left.eq(right),
(StorageType::Memory, StorageType::Memory) => true,
(StorageType::Custom { .. }, StorageType::Custom { .. }) => {
true
}
_ => false,
}
}
}
impl Eq for StorageType {}
pub struct IpfsOptions {
pub ipfs_path: StorageType,
pub bootstrap: Vec<Multiaddr>,
#[cfg(feature = "beetle_bitswap")]
pub bitswap_config: BitswapConfig,
pub relay_server_config: RelayConfig,
pub listening_addrs: Vec<Multiaddr>,
pub transport_configuration: crate::p2p::TransportConfig,
pub swarm_configuration: crate::p2p::SwarmConfig,
pub identify_configuration: crate::p2p::IdentifyConfiguration,
pub pubsub_config: crate::p2p::PubsubConfig,
pub kad_configuration: Either<KadConfig, libp2p::kad::Config>,
pub kad_store_config: KadStoreConfig,
pub ping_configuration: PingConfig,
pub addr_config: AddressBookConfig,
pub keystore: Keystore,
pub connection_idle: Duration,
pub provider: RepoProvider,
pub span: Option<Span>,
pub(crate) protocols: Libp2pProtocol,
}
#[derive(Default, Clone, Copy)]
pub(crate) struct Libp2pProtocol {
pub(crate) pubsub: bool,
pub(crate) kad: bool,
pub(crate) bitswap: bool,
pub(crate) relay_client: bool,
pub(crate) relay_server: bool,
pub(crate) dcutr: bool,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) mdns: bool,
pub(crate) identify: bool,
pub(crate) autonat: bool,
pub(crate) rendezvous_client: bool,
pub(crate) rendezvous_server: bool,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) upnp: bool,
pub(crate) ping: bool,
#[cfg(feature = "experimental_stream")]
pub(crate) streams: bool,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum RepoProvider {
#[default]
None,
All,
Pinned,
Roots,
}
impl Default for IpfsOptions {
fn default() -> Self {
Self {
ipfs_path: StorageType::Memory,
bootstrap: Default::default(),
#[cfg(feature = "beetle_bitswap")]
bitswap_config: Default::default(),
relay_server_config: Default::default(),
kad_configuration: Either::Left(Default::default()),
kad_store_config: Default::default(),
ping_configuration: Default::default(),
identify_configuration: Default::default(),
addr_config: Default::default(),
provider: Default::default(),
keystore: Keystore::in_memory(),
connection_idle: Duration::from_secs(30),
listening_addrs: vec![],
transport_configuration: TransportConfig::default(),
pubsub_config: PubsubConfig::default(),
swarm_configuration: SwarmConfig::default(),
span: None,
protocols: Default::default(),
}
}
}
impl fmt::Debug for IpfsOptions {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("IpfsOptions")
.field("ipfs_path", &self.ipfs_path)
.field("bootstrap", &self.bootstrap)
.field("listening_addrs", &self.listening_addrs)
.field("span", &self.span)
.finish()
}
}
#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct Ipfs {
span: Span,
repo: Repo,
key: Keypair,
keystore: Keystore,
identify_conf: IdentifyConfiguration,
to_task: Sender<IpfsEvent>,
record_key_validator: HashMap<String, Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>>,
_guard: Arc<DropGuard>,
}
impl std::fmt::Debug for Ipfs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Ipfs").finish()
}
}
type Channel<T> = OneshotSender<Result<T, Error>>;
type ReceiverChannel<T> = oneshot::Receiver<Result<T, Error>>;
#[derive(Debug)]
#[allow(clippy::type_complexity)]
enum IpfsEvent {
Connect(DialOpts, Channel<()>),
Protocol(OneshotSender<Vec<String>>),
Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
Listeners(Channel<Vec<Multiaddr>>),
ExternalAddresses(Channel<Vec<Multiaddr>>),
Connected(Channel<Vec<PeerId>>),
IsConnected(PeerId, Channel<bool>),
Disconnect(PeerId, Channel<()>),
Ban(PeerId, Channel<()>),
Unban(PeerId, Channel<()>),
PubsubSubscribe(String, Channel<Option<SubscriptionStream>>),
PubsubUnsubscribe(String, Channel<Result<bool, Error>>),
PubsubPublish(String, Bytes, Channel<Result<MessageId, PublishError>>),
PubsubPeers(Option<String>, Channel<Vec<PeerId>>),
GetBitswapPeers(Channel<BoxFuture<'static, Vec<PeerId>>>),
WantList(Option<PeerId>, Channel<BoxFuture<'static, Vec<Cid>>>),
PubsubSubscribed(Channel<Vec<String>>),
AddListeningAddress(Multiaddr, Channel<Multiaddr>),
RemoveListeningAddress(Multiaddr, Channel<()>),
AddExternalAddress(Multiaddr, Channel<()>),
RemoveExternalAddress(Multiaddr, Channel<()>),
Bootstrap(Channel<ReceiverChannel<KadResult>>),
AddPeer(PeerId, Multiaddr, Channel<()>),
RemovePeer(PeerId, Option<Multiaddr>, Channel<bool>),
GetClosestPeers(PeerId, Channel<ReceiverChannel<KadResult>>),
FindPeerIdentity(PeerId, Channel<ReceiverChannel<libp2p::identify::Info>>),
FindPeer(
PeerId,
bool,
Channel<Either<Vec<Multiaddr>, ReceiverChannel<KadResult>>>,
),
GetProviders(Cid, Channel<Option<BoxStream<'static, PeerId>>>),
Provide(Cid, Channel<ReceiverChannel<KadResult>>),
DhtMode(DhtMode, Channel<()>),
DhtGet(Key, Channel<BoxStream<'static, Record>>),
DhtPut(Key, Vec<u8>, Quorum, Channel<ReceiverChannel<KadResult>>),
GetBootstrappers(OneshotSender<Vec<Multiaddr>>),
AddBootstrapper(Multiaddr, Channel<Multiaddr>),
RemoveBootstrapper(Multiaddr, Channel<Multiaddr>),
ClearBootstrappers(Channel<Vec<Multiaddr>>),
DefaultBootstrap(Channel<Vec<Multiaddr>>),
AddRelay(PeerId, Multiaddr, Channel<()>),
RemoveRelay(PeerId, Multiaddr, Channel<()>),
EnableRelay(Option<PeerId>, Channel<()>),
DisableRelay(PeerId, Channel<()>),
ListRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
ListActiveRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
PubsubEventStream(OneshotSender<UnboundedReceiver<InnerPubsubEvent>>),
RegisterRendezvousNamespace(Namespace, PeerId, Option<u64>, Channel<()>),
UnregisterRendezvousNamespace(Namespace, PeerId, Channel<()>),
RendezvousNamespaceDiscovery(
Option<Namespace>,
bool,
Option<u64>,
PeerId,
Channel<HashMap<PeerId, Vec<Multiaddr>>>,
),
#[cfg(feature = "experimental_stream")]
StreamControlHandle(Channel<libp2p_stream::Control>),
#[cfg(feature = "experimental_stream")]
NewStream(StreamProtocol, Channel<libp2p_stream::IncomingStreams>),
Exit,
}
#[derive(Debug, Copy, Clone)]
pub enum DhtMode {
Auto,
Client,
Server,
}
impl From<DhtMode> for Option<Mode> {
fn from(mode: DhtMode) -> Self {
match mode {
DhtMode::Auto => None,
DhtMode::Client => Some(Mode::Client),
DhtMode::Server => Some(Mode::Server),
}
}
}
#[derive(Debug, Clone)]
pub enum PubsubEvent {
Subscribe { peer_id: PeerId },
Unsubscribe { peer_id: PeerId },
}
#[derive(Debug, Clone)]
pub(crate) enum InnerPubsubEvent {
Subscribe { topic: String, peer_id: PeerId },
Unsubscribe { topic: String, peer_id: PeerId },
}
impl From<InnerPubsubEvent> for PubsubEvent {
fn from(event: InnerPubsubEvent) -> Self {
match event {
InnerPubsubEvent::Subscribe { peer_id, .. } => PubsubEvent::Subscribe { peer_id },
InnerPubsubEvent::Unsubscribe { peer_id, .. } => PubsubEvent::Unsubscribe { peer_id },
}
}
}
type TSwarmEvent<C> = <TSwarm<C> as Stream>::Item;
type TSwarmEventFn<C> = Arc<dyn Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send>;
type TTransportFn = Box<
dyn Fn(
&Keypair,
Option<libp2p::relay::client::Transport>,
) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>>
+ Sync
+ Send
+ 'static,
>;
#[derive(Debug, Copy, Clone)]
pub enum FDLimit {
Max,
Custom(u64),
}
#[allow(clippy::type_complexity)]
pub struct UninitializedIpfs<C: NetworkBehaviour<ToSwarm = void::Void> + Send> {
keys: Option<Keypair>,
options: IpfsOptions,
fdlimit: Option<FDLimit>,
repo_handle: Option<Repo>,
local_external_addr: bool,
swarm_event: Option<TSwarmEventFn<C>>,
record_key_validator: HashMap<String, Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>>,
custom_behaviour: Option<C>,
custom_transport: Option<TTransportFn>,
gc_config: Option<GCConfig>,
gc_repo_duration: Option<Duration>,
}
pub type UninitializedIpfsNoop = UninitializedIpfs<libp2p::swarm::dummy::Behaviour>;
impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> Default for UninitializedIpfs<C> {
fn default() -> Self {
Self::new()
}
}
impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
pub fn new() -> Self {
UninitializedIpfs {
keys: None,
options: Default::default(),
fdlimit: None,
repo_handle: None,
record_key_validator: Default::default(),
local_external_addr: false,
swarm_event: None,
custom_behaviour: None,
custom_transport: None,
gc_config: None,
gc_repo_duration: None,
}
}
#[deprecated(
note = "UninitializedIpfs::empty will be removed in the future. Use UninitializedIpfs::new()"
)]
pub fn empty() -> Self {
Self::new()
}
#[deprecated(
note = "UninitializedIpfs::with_opt will be removed in the future. Use UninitializedIpfs::new()"
)]
pub fn with_opt(options: IpfsOptions) -> Self {
let mut opt = Self::new();
opt.options = options;
opt
}
pub fn set_default_listener(self) -> Self {
self.add_listening_addrs(vec![
"/ip4/0.0.0.0/tcp/0".parse().unwrap(),
"/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
])
}
pub fn set_storage_type(mut self, storage_type: StorageType) -> Self {
self.options.ipfs_path = storage_type;
self
}
pub fn add_listening_addr(mut self, addr: Multiaddr) -> Self {
if !self.options.listening_addrs.contains(&addr) {
self.options.listening_addrs.push(addr)
}
self
}
pub fn add_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
self.options.listening_addrs.extend(addrs);
self
}
pub fn set_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
self.options.listening_addrs = addrs;
self
}
pub fn add_bootstrap(mut self, addr: Multiaddr) -> Self {
if !self.options.bootstrap.contains(&addr) {
self.options.bootstrap.push(addr)
}
self
}
#[cfg(feature = "beetle_bitswap")]
pub fn with_default(self) -> Self {
self.with_identify(Default::default())
.with_autonat()
.with_bitswap(Default::default())
.with_kademlia(Either::Left(Default::default()), Default::default())
.with_ping(Default::default())
.with_pubsub(Default::default())
}
#[cfg(feature = "libp2p_bitswap")]
pub fn with_default(self) -> Self {
self.with_identify(Default::default())
.with_autonat()
.with_bitswap()
.with_kademlia(Either::Left(Default::default()), Default::default())
.with_ping(Default::default())
.with_pubsub(Default::default())
}
#[cfg(not(any(feature = "libp2p_bitswap", feature = "beetle_bitswap")))]
pub fn with_default(self) -> Self {
self.with_identify(Default::default())
.with_autonat()
.with_bitswap()
.with_kademlia(Either::Left(Default::default()), Default::default())
.with_ping(Default::default())
.with_pubsub(Default::default())
}
pub fn with_kademlia(
mut self,
config: impl Into<Either<KadConfig, libp2p::kad::Config>>,
store: KadStoreConfig,
) -> Self {
let config = config.into();
self.options.protocols.kad = true;
self.options.kad_configuration = config;
self.options.kad_store_config = store;
self
}
#[cfg(feature = "beetle_bitswap")]
pub fn with_bitswap(mut self, config: BitswapConfig) -> Self {
self.options.protocols.bitswap = true;
self.options.bitswap_config = config;
self
}
#[cfg(feature = "libp2p_bitswap")]
pub fn with_bitswap(mut self) -> Self {
self.options.protocols.bitswap = true;
self
}
#[cfg(not(any(feature = "libp2p_bitswap", feature = "beetle_bitswap")))]
pub fn with_bitswap(mut self) -> Self {
self.options.protocols.bitswap = true;
self
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_mdns(mut self) -> Self {
self.options.protocols.mdns = true;
self
}
pub fn with_relay(mut self, with_dcutr: bool) -> Self {
self.options.protocols.relay_client = true;
self.options.protocols.dcutr = with_dcutr;
self
}
pub fn with_relay_server(mut self, config: RelayConfig) -> Self {
self.options.protocols.relay_server = true;
self.options.relay_server_config = config;
self
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_upnp(mut self) -> Self {
self.options.protocols.upnp = true;
self
}
pub fn with_rendezvous_server(mut self) -> Self {
self.options.protocols.rendezvous_server = true;
self
}
pub fn with_rendezvous_client(mut self) -> Self {
self.options.protocols.rendezvous_client = true;
self
}
pub fn with_identify(mut self, config: crate::p2p::IdentifyConfiguration) -> Self {
self.options.protocols.identify = true;
self.options.identify_configuration = config;
self
}
#[cfg(feature = "experimental_stream")]
pub fn with_streams(mut self) -> Self {
self.options.protocols.streams = true;
self
}
pub fn with_pubsub(mut self, config: PubsubConfig) -> Self {
self.options.protocols.pubsub = true;
self.options.pubsub_config = config;
self
}
pub fn with_autonat(mut self) -> Self {
self.options.protocols.autonat = true;
self
}
pub fn with_ping(mut self, config: PingConfig) -> Self {
self.options.protocols.ping = true;
self.options.ping_configuration = config;
self
}
pub fn with_custom_behaviour(mut self, behaviour: C) -> Self {
self.custom_behaviour = Some(behaviour);
self
}
pub fn with_gc(mut self, config: GCConfig) -> Self {
self.gc_config = Some(config);
self
}
pub fn set_temp_pin_duration(mut self, duration: Duration) -> Self {
self.gc_repo_duration = Some(duration);
self
}
#[cfg(not(target_arch = "wasm32"))]
pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> Self {
let path = path.as_ref().to_path_buf();
self.options.ipfs_path = StorageType::Disk(path);
self
}
pub fn set_transport_configuration(mut self, config: crate::p2p::TransportConfig) -> Self {
self.options.transport_configuration = config;
self
}
pub fn set_idle_connection_timeout(mut self, duration: u64) -> Self {
self.options.connection_idle = Duration::from_secs(duration);
self
}
pub fn set_swarm_configuration(mut self, config: crate::p2p::SwarmConfig) -> Self {
self.options.swarm_configuration = config;
self
}
pub fn default_record_key_validator(mut self) -> Self {
self.record_key_validator.insert(
"ipns".into(),
Arc::new(|key| to_dht_key(("ipns", |key| ipns_to_dht_key(key)), key)),
);
self
}
#[allow(clippy::type_complexity)]
pub fn set_record_prefix_validator(
mut self,
key: &str,
callback: Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>,
) -> Self {
self.record_key_validator.insert(key.to_string(), callback);
self
}
pub fn set_addrbook_configuration(mut self, config: AddressBookConfig) -> Self {
self.options.addr_config = config;
self
}
pub fn set_provider(mut self, opt: RepoProvider) -> Self {
self.options.provider = opt;
self
}
pub fn set_keypair(mut self, keypair: &Keypair) -> Self {
self.keys = Some(keypair.clone());
self
}
pub fn set_repo(mut self, repo: &Repo) -> Self {
self.repo_handle = Some(repo.clone());
self
}
pub fn set_keystore(mut self, keystore: &Keystore) -> Self {
self.options.keystore = keystore.clone();
self
}
pub fn listen_as_external_addr(mut self) -> Self {
self.local_external_addr = true;
self
}
pub fn with_custom_transport(mut self, transport: TTransportFn) -> Self {
self.custom_transport = Some(transport);
self
}
pub fn fd_limit(mut self, limit: FDLimit) -> Self {
self.fdlimit = Some(limit);
self
}
pub fn set_span(mut self, span: Span) -> Self {
self.options.span = Some(span);
self
}
pub fn swarm_events<F>(mut self, func: F) -> Self
where
F: Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send + 'static,
{
self.swarm_event = Some(Arc::new(func));
self
}
pub async fn start(self) -> Result<Ipfs, Error> {
let UninitializedIpfs {
keys,
fdlimit,
mut options,
swarm_event,
custom_behaviour,
custom_transport,
record_key_validator,
local_external_addr,
repo_handle,
gc_config,
..
} = self;
let keys = keys.unwrap_or(Keypair::generate_ed25519());
let root_span = Option::take(&mut options.span)
.unwrap_or_else(|| tracing::trace_span!(parent: &Span::current(), "ipfs"));
let init_span = tracing::trace_span!(parent: &root_span, "init");
let facade_span = tracing::trace_span!("facade");
let exec_span = tracing::trace_span!(parent: &root_span, "exec");
let swarm_span = tracing::trace_span!(parent: &root_span, "swarm");
let repo = match repo_handle {
Some(repo) => {
if repo.is_online() {
anyhow::bail!("Repo is already initialized");
}
repo
}
None => {
#[cfg(not(target_arch = "wasm32"))]
if let StorageType::Disk(path) = &options.ipfs_path {
if !path.is_dir() {
tokio::fs::create_dir_all(path).await?;
}
}
Repo::new(&mut options.ipfs_path)
}
};
repo.init().instrument(init_span.clone()).await?;
let repo_events = repo.initialize_channel();
if let Some(limit) = fdlimit {
#[cfg(unix)]
{
let (_, hard) = rlimit::Resource::NOFILE.get()?;
let limit = match limit {
FDLimit::Max => hard,
FDLimit::Custom(limit) => limit,
};
let target = std::cmp::min(hard, limit);
rlimit::Resource::NOFILE.set(target, hard)?;
let (soft, _) = rlimit::Resource::NOFILE.get()?;
if soft < 2048 {
error!("Limit is too low: {soft}");
}
}
#[cfg(not(unix))]
{
warn!("Cannot set {limit:?}. Can only set a fd limit on unix systems. Ignoring...")
}
}
let token = CancellationToken::new();
let _guard = Arc::new(token.clone().drop_guard());
let (to_task, receiver) = channel::<IpfsEvent>(1);
let id_conf = options.identify_configuration.clone();
let keystore = options.keystore.clone();
let ipfs = Ipfs {
span: facade_span,
repo,
identify_conf: id_conf,
key: keys.clone(),
keystore,
to_task,
record_key_validator,
_guard,
};
let blocks = match options.provider {
RepoProvider::None => vec![],
RepoProvider::All => ipfs.repo.list_blocks().await.collect::<Vec<_>>().await,
RepoProvider::Pinned => {
ipfs.repo
.list_pins(None)
.await
.filter_map(|result| async move { result.map(|(cid, _)| cid).ok() })
.collect()
.await
}
RepoProvider::Roots => {
warn!("RepoProvider::Roots is not implemented... ignoring...");
vec![]
}
};
let count = blocks.len();
let store_config = &mut options.kad_store_config;
match store_config.memory.as_mut() {
Some(memory_config) => {
memory_config.max_provided_keys += count;
}
None => {
store_config.memory = Some(MemoryStoreConfig {
max_provided_keys: (50 * 1024) + count,
..Default::default()
})
}
}
let swarm = create_swarm(
&keys,
&options,
&ipfs.repo,
exec_span,
(custom_behaviour, custom_transport),
)
.instrument(tracing::trace_span!(parent: &init_span, "swarm"))
.await?;
let IpfsOptions {
listening_addrs, ..
} = options;
if let Some(config) = gc_config {
rt::spawn({
let repo = ipfs.repo.clone();
let token = token.clone();
async move {
let GCConfig { duration, trigger } = config;
let use_config_timer = duration != Duration::ZERO;
if trigger == GCTrigger::None && !use_config_timer {
tracing::warn!("GC does not have a set timer or a trigger. Disabling GC");
return;
}
let time = match use_config_timer {
true => duration,
false => Duration::from_secs(60 * 60),
};
let mut interval = futures_timer::Delay::new(time);
loop {
tokio::select! {
_ = token.cancelled() => {
tracing::debug!("gc task cancelled");
break
},
_ = &mut interval => {
let _g = repo.inner.gclock.write().await;
tracing::debug!("preparing gc operation");
let pinned = repo
.list_pins(None)
.await
.try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
.try_collect::<BTreeSet<_>>()
.await
.unwrap_or_default();
let pinned = Vec::from_iter(pinned);
let total_size = repo.get_total_size().await.unwrap_or_default();
let pinned_size = repo
.get_blocks_size(&pinned)
.await
.ok()
.flatten()
.unwrap_or_default();
let unpinned_blocks = total_size - pinned_size;
tracing::debug!(total_size = %total_size, ?trigger, unpinned_blocks);
let cleanup = match trigger {
GCTrigger::At { size } => {
total_size > 0 && unpinned_blocks >= size
}
GCTrigger::AtStorage => {
unpinned_blocks > 0
&& unpinned_blocks >= repo.max_storage_size()
}
GCTrigger::None => unpinned_blocks > 0,
};
tracing::debug!(will_run = %cleanup);
if cleanup {
tracing::debug!("running cleanup of unpinned blocks");
let blocks = repo.cleanup().await.unwrap();
tracing::debug!(removed_blocks = blocks.len(), "blocks removed");
tracing::debug!("cleanup finished");
}
interval.reset(time);
}
}
}
}
});
}
let mut fut = task::IpfsTask::new(swarm, repo_events.fuse(), receiver.fuse(), &ipfs.repo);
fut.swarm_event = swarm_event;
fut.local_external_addr = local_external_addr;
for addr in listening_addrs.into_iter() {
match fut.swarm.listen_on(addr) {
Ok(id) => {
let (tx, _rx) = oneshot_channel();
fut.pending_add_listener.insert(id, tx);
}
_ => continue,
};
}
for block in blocks {
if let Some(kad) = fut.swarm.behaviour_mut().kademlia.as_mut() {
let key = Key::from(block.hash().to_bytes());
match kad.start_providing(key) {
Ok(id) => {
let (tx, _rx) = oneshot_channel();
fut.kad_subscriptions.insert(id, tx);
}
Err(e) => match e {
libp2p::kad::store::Error::MaxProvidedKeys => break,
_ => unreachable!(),
},
};
}
}
rt::spawn({
async move {
let as_fut = false;
let fut = if as_fut {
fut.boxed()
} else {
fut.run().boxed()
};
tokio::select! {
_ = fut => {}
_ = token.cancelled() => {},
};
}
.instrument(swarm_span)
});
Ok(ipfs)
}
}
impl Ipfs {
pub fn dag(&self) -> IpldDag {
IpldDag::new(self.clone())
}
pub fn repo(&self) -> &Repo {
&self.repo
}
pub fn unixfs(&self) -> IpfsUnixfs {
IpfsUnixfs::new(self.clone())
}
pub fn ipns(&self) -> Ipns {
Ipns::new(self.clone())
}
pub async fn put_block(&self, block: Block) -> Result<Cid, Error> {
self.repo.put_block(block).span(self.span.clone()).await
}
pub async fn get_block(&self, cid: &Cid) -> Result<Block, Error> {
self.repo
.get_block(cid, &[], false)
.instrument(self.span.clone())
.await
}
pub async fn remove_block(&self, cid: Cid, recursive: bool) -> Result<Vec<Cid>, Error> {
self.repo
.remove_block(&cid, recursive)
.instrument(self.span.clone())
.await
}
pub async fn gc(&self) -> Result<Vec<Cid>, Error> {
let _g = self.repo.inner.gclock.write().await;
self.repo.cleanup().instrument(self.span.clone()).await
}
pub fn insert_pin(&self, cid: &Cid) -> RepoInsertPin {
self.repo().pin(cid).span(self.span.clone())
}
pub fn remove_pin(&self, cid: &Cid) -> RepoRemovePin {
self.repo().remove_pin(cid).span(self.span.clone())
}
pub async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
let span = debug_span!(parent: &self.span, "is_pinned", cid = %cid);
self.repo.is_pinned(cid).instrument(span).await
}
pub async fn list_pins(
&self,
filter: Option<PinMode>,
) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
let span = debug_span!(parent: &self.span, "list_pins", ?filter);
self.repo.list_pins(filter).instrument(span).await
}
pub async fn query_pins(
&self,
cids: Vec<Cid>,
requirement: Option<PinMode>,
) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
let span = debug_span!(parent: &self.span, "query_pins", ids = cids.len(), ?requirement);
self.repo
.query_pins(cids, requirement)
.instrument(span)
.await
}
pub fn put_dag(&self, ipld: Ipld) -> DagPut {
self.dag().put_dag(ipld).span(self.span.clone())
}
pub fn get_dag<I: Into<IpfsPath>>(&self, path: I) -> DagGet {
self.dag().get_dag(path).span(self.span.clone())
}
pub fn cat_unixfs(&self, starting_point: impl Into<unixfs::StartingPoint>) -> UnixfsCat {
self.unixfs().cat(starting_point).span(self.span.clone())
}
pub fn add_unixfs(&self, opt: impl Into<AddOpt>) -> UnixfsAdd {
self.unixfs().add(opt).span(self.span.clone())
}
pub fn get_unixfs<P: AsRef<Path>>(&self, path: IpfsPath, dest: P) -> UnixfsGet {
self.unixfs().get(path, dest).span(self.span.clone())
}
pub fn ls_unixfs(&self, path: IpfsPath) -> UnixfsLs {
self.unixfs().ls(path).span(self.span.clone())
}
pub async fn resolve_ipns(&self, path: &IpfsPath, recursive: bool) -> Result<IpfsPath, Error> {
async move {
let ipns = self.ipns();
let mut resolved = ipns.resolve(path).await;
if recursive {
let mut seen = HashSet::with_capacity(1);
while let Ok(ref res) = resolved {
if !seen.insert(res.clone()) {
break;
}
resolved = ipns.resolve(res).await;
}
}
resolved
}
.instrument(self.span.clone())
.await
}
pub async fn publish_ipns(&self, path: &IpfsPath) -> Result<IpfsPath, Error> {
async move {
let ipns = self.ipns();
ipns.publish(None, path, None).await
}
.instrument(self.span.clone())
.await
}
pub async fn connect(&self, target: impl Into<DialOpts>) -> Result<(), Error> {
async move {
let target = target.into();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Connect(target, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Addresses(tx)).await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn is_connected(&self, peer_id: PeerId) -> Result<bool, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::IsConnected(peer_id, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn connected(&self) -> Result<Vec<PeerId>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Connected(tx)).await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn disconnect(&self, target: PeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Disconnect(target, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn ban_peer(&self, target: PeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Ban(target, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn unban_peer(&self, target: PeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Unban(target, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn identity(&self, peer_id: Option<PeerId>) -> Result<PeerInfo, Error> {
async move {
match peer_id {
Some(peer_id) => {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::FindPeerIdentity(peer_id, tx))
.await?;
rx.await??.await?.map(PeerInfo::from)
}
None => {
let mut addresses = HashSet::new();
let (local_result, external_result) =
futures::join!(self.listening_addresses(), self.external_addresses());
let external: HashSet<Multiaddr> =
HashSet::from_iter(external_result.unwrap_or_default());
let local: HashSet<Multiaddr> =
HashSet::from_iter(local_result.unwrap_or_default());
addresses.extend(external.iter().cloned());
addresses.extend(local.iter().cloned());
let mut addresses = Vec::from_iter(addresses);
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Protocol(tx)).await?;
let protocols = rx
.await?
.iter()
.filter_map(|s| StreamProtocol::try_from_owned(s.clone()).ok())
.collect();
let public_key = self.key.public();
let peer_id = public_key.to_peer_id();
for addr in &mut addresses {
if !matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
addr.push(Protocol::P2p(peer_id))
}
}
let info = PeerInfo {
peer_id,
public_key,
protocol_version: self.identify_conf.protocol_version.clone(),
agent_version: self.identify_conf.agent_version.clone(),
listen_addrs: addresses,
protocols,
observed_addr: None,
};
Ok(info)
}
}
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_subscribe(
&self,
topic: impl Into<String>,
) -> Result<SubscriptionStream, Error> {
async move {
let topic = topic.into();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubSubscribe(topic.clone(), tx))
.await?;
rx.await??
.ok_or_else(|| format_err!("already subscribed to {:?}", topic))
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_events(
&self,
topic: impl Into<String>,
) -> Result<BoxStream<'static, PubsubEvent>, Error> {
async move {
let topic = topic.into();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubEventStream(tx))
.await?;
let mut receiver = rx
.await?;
let defined_topic = topic.to_string();
let stream = async_stream::stream! {
while let Some(event) = receiver.next().await {
match &event {
InnerPubsubEvent::Subscribe { topic, .. } | InnerPubsubEvent::Unsubscribe { topic, .. } if topic.eq(&defined_topic) => yield event.into(),
_ => {}
}
}
};
Ok(stream.boxed())
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_publish(
&self,
topic: impl Into<String>,
data: impl Into<Bytes>,
) -> Result<MessageId, Error> {
async move {
let topic = topic.into();
let data = data.into();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubPublish(topic, data, tx))
.await?;
rx.await??.map_err(anyhow::Error::from)
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_unsubscribe(&self, topic: impl Into<String>) -> Result<bool, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubUnsubscribe(topic.into(), tx))
.await?;
rx.await??
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_peers(
&self,
topic: impl Into<Option<String>>,
) -> Result<Vec<PeerId>, Error> {
async move {
let topic = topic.into();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubPeers(topic, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_subscribed(&self) -> Result<Vec<String>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubSubscribed(tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn bitswap_wantlist(
&self,
peer: impl Into<Option<PeerId>>,
) -> Result<Vec<Cid>, Error> {
async move {
let peer = peer.into();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::WantList(peer, tx))
.await?;
Ok(rx.await??.await)
}
.instrument(self.span.clone())
.await
}
#[cfg(feature = "experimental_stream")]
pub async fn stream_control(&self) -> Result<libp2p_stream::Control, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::StreamControlHandle(tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
#[cfg(feature = "experimental_stream")]
pub async fn new_stream(
&self,
protocol: impl Into<StreamProtocolRef>,
) -> Result<libp2p_stream::IncomingStreams, Error> {
let protocol: StreamProtocol = protocol.into().try_into()?;
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::NewStream(protocol, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
#[cfg(feature = "experimental_stream")]
pub async fn open_stream(
&self,
peer_id: PeerId,
protocol: impl Into<StreamProtocolRef>,
) -> Result<libp2p::Stream, Error> {
let protocol: StreamProtocol = protocol.into().try_into()?;
async move {
let mut control = self.stream_control().await?;
let stream = control
.open_stream(peer_id, protocol)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(stream)
}
.instrument(self.span.clone())
.await
}
pub async fn refs_local(&self) -> Vec<Cid> {
self.repo
.list_blocks()
.instrument(self.span.clone())
.await
.collect::<Vec<_>>()
.await
}
pub async fn listening_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Listeners(tx)).await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn external_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::ExternalAddresses(tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn add_listening_address(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::AddListeningAddress(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn remove_listening_address(&self, addr: Multiaddr) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::RemoveListeningAddress(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn add_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::AddExternalAddress(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn remove_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::RemoveExternalAddress(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn find_peer(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::FindPeer(peer_id, false, tx))
.await?;
match rx.await?? {
Either::Left(addrs) if !addrs.is_empty() => Ok(addrs),
Either::Left(_) => unreachable!(),
Either::Right(future) => {
future.await??;
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::FindPeer(peer_id, true, tx))
.await?;
match rx.await?? {
Either::Left(addrs) if !addrs.is_empty() => Ok(addrs),
_ => Err(anyhow!("couldn't find peer {}", peer_id)),
}
}
}
}
.instrument(self.span.clone())
.await
}
pub async fn get_providers(&self, cid: Cid) -> Result<BoxStream<'static, PeerId>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetProviders(cid, tx))
.await?;
rx.await??.ok_or_else(|| anyhow!("Provider already exist"))
}
.instrument(self.span.clone())
.await
}
pub async fn provide(&self, cid: Cid) -> Result<(), Error> {
if !self.repo.contains(&cid).await? {
return Err(anyhow!(
"Error: block {} not found locally, cannot provide",
cid
));
}
let kad_result = async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Provide(cid, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await?
.await;
match kad_result? {
Ok(KadResult::Complete) => Ok(()),
Ok(_) => unreachable!(),
Err(e) => Err(anyhow!(e)),
}
}
pub fn fetch(&self, cid: &Cid) -> RepoFetch {
self.repo.fetch(cid).span(self.span.clone())
}
pub async fn get_closest_peers(&self, peer_id: PeerId) -> Result<Vec<PeerId>, Error> {
let kad_result = async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetClosestPeers(peer_id, tx))
.await?;
Ok(rx.await??).map_err(|e: String| anyhow!(e))
}
.instrument(self.span.clone())
.await?
.await;
match kad_result? {
Ok(KadResult::Peers(closest)) => Ok(closest),
Ok(_) => unreachable!(),
Err(e) => Err(anyhow!(e)),
}
}
pub async fn dht_mode(&self, mode: DhtMode) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::DhtMode(mode, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn dht_get<T: AsRef<[u8]>>(
&self,
key: T,
) -> Result<BoxStream<'static, Record>, Error> {
async move {
let key = key.as_ref();
let key_str = String::from_utf8_lossy(key);
let key = if let Ok((prefix, _)) = split_dht_key(&key_str) {
if let Some(key_fn) = self.record_key_validator.get(prefix) {
key_fn(&key_str)?
} else {
Key::from(key.to_vec())
}
} else {
Key::from(key.to_vec())
};
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::DhtGet(key, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn dht_put(
&self,
key: impl AsRef<[u8]>,
value: impl Into<Vec<u8>>,
quorum: Quorum,
) -> Result<(), Error> {
let kad_result = async move {
let key = key.as_ref();
let key_str = String::from_utf8_lossy(key);
let key = if let Ok((prefix, _)) = split_dht_key(&key_str) {
if let Some(key_fn) = self.record_key_validator.get(prefix) {
key_fn(&key_str)?
} else {
Key::from(key.to_vec())
}
} else {
Key::from(key.to_vec())
};
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::DhtPut(key, value.into(), quorum, tx))
.await?;
Ok(rx.await?).map_err(|e: String| anyhow!(e))
}
.instrument(self.span.clone())
.await??
.await;
match kad_result? {
Ok(KadResult::Complete) => Ok(()),
Ok(_) => unreachable!(),
Err(e) => Err(anyhow!(e)),
}
}
pub async fn add_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::AddRelay(peer_id, addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn remove_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::RemoveRelay(peer_id, addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn list_relays(&self, active: bool) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
async move {
let (tx, rx) = oneshot_channel();
match active {
true => {
self.to_task
.clone()
.send(IpfsEvent::ListActiveRelays(tx))
.await?
}
false => self.to_task.clone().send(IpfsEvent::ListRelays(tx)).await?,
};
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn enable_autorelay(&self) -> Result<(), Error> {
Err(anyhow::anyhow!("Unimplemented"))
}
pub async fn disable_autorelay(&self) -> Result<(), Error> {
Err(anyhow::anyhow!("Unimplemented"))
}
pub async fn enable_relay(&self, peer_id: impl Into<Option<PeerId>>) -> Result<(), Error> {
async move {
let peer_id = peer_id.into();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::EnableRelay(peer_id, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn disable_relay(&self, peer_id: PeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::DisableRelay(peer_id, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn rendezvous_register_namespace(
&self,
namespace: impl Into<String>,
ttl: impl Into<Option<u64>>,
peer_id: PeerId,
) -> Result<(), Error> {
async move {
let namespace = Namespace::new(namespace.into())?;
let ttl = ttl.into();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::RegisterRendezvousNamespace(
namespace, peer_id, ttl, tx,
))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn rendezvous_unregister_namespace(
&self,
namespace: impl Into<String>,
peer_id: PeerId,
) -> Result<(), Error> {
async move {
let namespace = Namespace::new(namespace.into())?;
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::UnregisterRendezvousNamespace(
namespace, peer_id, tx,
))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn rendezvous_namespace_discovery(
&self,
namespace: impl Into<String>,
ttl: impl Into<Option<u64>>,
peer_id: PeerId,
) -> Result<HashMap<PeerId, Vec<Multiaddr>>, Error> {
async move {
let namespace = Namespace::new(namespace.into())?;
let ttl = ttl.into();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::RendezvousNamespaceDiscovery(
Some(namespace),
false,
ttl,
peer_id,
tx,
))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub fn refs<'a, Iter>(
&'a self,
iplds: Iter,
max_depth: Option<u64>,
unique: bool,
) -> impl Stream<Item = Result<refs::Edge, libipld::error::Error>> + Send + 'a
where
Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
{
refs::iplds_refs(self.repo(), iplds, max_depth, unique)
}
pub async fn get_bootstraps(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetBootstrappers(tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn add_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::AddBootstrapper(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn remove_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::RemoveBootstrapper(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn clear_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::ClearBootstrappers(tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn default_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::DefaultBootstrap(tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn bootstrap(&self) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?;
let fut = rx.await??;
rt::spawn(async move {
if let Err(e) = fut.await.map_err(|e| anyhow!(e)) {
tracing::error!(error = %e, "failed to bootstrap");
}
});
Ok(())
}
pub async fn add_peer(&self, peer_id: PeerId, mut addr: Multiaddr) -> Result<(), Error> {
if matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
addr.pop();
}
let (tx, rx) = oneshot::channel();
self.to_task
.clone()
.send(IpfsEvent::AddPeer(peer_id, addr, tx))
.await?;
rx.await??;
Ok(())
}
pub async fn remove_peer(&self, peer_id: PeerId) -> Result<bool, Error> {
let (tx, rx) = oneshot::channel();
self.to_task
.clone()
.send(IpfsEvent::RemovePeer(peer_id, None, tx))
.await?;
rx.await.map_err(anyhow::Error::from)?
}
pub async fn remove_peer_address(
&self,
peer_id: PeerId,
addr: Multiaddr,
) -> Result<bool, Error> {
let (tx, rx) = oneshot::channel();
self.to_task
.clone()
.send(IpfsEvent::RemovePeer(peer_id, Some(addr), tx))
.await?;
rx.await.map_err(anyhow::Error::from)?
}
pub async fn get_bitswap_peers(&self) -> Result<Vec<PeerId>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetBitswapPeers(tx))
.await?;
Ok(rx.await??.await)
}
pub fn keypair(&self) -> &Keypair {
&self.key
}
pub fn keystore(&self) -> &Keystore {
&self.keystore
}
pub async fn exit_daemon(mut self) {
self.repo.shutdown();
let _ = self.to_task.try_send(IpfsEvent::Exit);
}
}
pub enum StreamProtocolRef {
Static(&'static str),
Owned(String),
Direct(StreamProtocol),
}
impl From<&'static str> for StreamProtocolRef {
fn from(protocol: &'static str) -> Self {
StreamProtocolRef::Static(protocol)
}
}
impl From<String> for StreamProtocolRef {
fn from(protocol: String) -> Self {
StreamProtocolRef::Owned(protocol)
}
}
impl From<StreamProtocol> for StreamProtocolRef {
fn from(protocol: StreamProtocol) -> Self {
StreamProtocolRef::Direct(protocol)
}
}
impl TryFrom<StreamProtocolRef> for StreamProtocol {
type Error = std::io::Error;
fn try_from(protocl_ref: StreamProtocolRef) -> Result<Self, Self::Error> {
let protocol = match protocl_ref {
StreamProtocolRef::Direct(protocol) => protocol,
StreamProtocolRef::Owned(protocol) => StreamProtocol::try_from_owned(protocol)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
StreamProtocolRef::Static(protocol) => StreamProtocol::new(protocol),
};
Ok(protocol)
}
}
#[inline]
pub(crate) fn split_dht_key(key: &str) -> anyhow::Result<(&str, &str)> {
anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
let (key, val) = {
let data = key
.split('/')
.filter(|s| !s.trim().is_empty())
.collect::<Vec<_>>();
anyhow::ensure!(
!data.is_empty() && data.len() == 2,
"split dats cannot be empty"
);
(data[0], data[1])
};
Ok((key, val))
}
#[inline]
pub(crate) fn ipns_to_dht_key<B: AsRef<str>>(key: B) -> anyhow::Result<Key> {
use libipld::multibase;
let default_ipns_prefix = b"/ipns/";
let mut key = key.as_ref().trim().to_string();
anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
if key.starts_with('1') || key.starts_with('Q') {
key.insert(0, 'z');
}
let mut data = multibase::decode(key).map(|(_, data)| data)?;
if data[0] != 0x01 && data[1] != 0x72 {
data = [vec![0x01, 0x72], data].concat();
}
data = [default_ipns_prefix.to_vec(), data[2..].to_vec()].concat();
Ok(data.into())
}
#[inline]
pub(crate) fn to_dht_key<B: AsRef<str>, F: Fn(&str) -> anyhow::Result<Key>>(
(prefix, func): (&str, F),
key: B,
) -> anyhow::Result<Key> {
let key = key.as_ref().trim();
let (key, val) = split_dht_key(key)?;
anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
anyhow::ensure!(!val.is_empty(), "Value cannot be empty");
if key == prefix {
return func(val);
}
anyhow::bail!("Invalid prefix")
}
use crate::p2p::AddressBookConfig;
#[doc(hidden)]
pub use node::Node;
mod node {
use super::*;
pub struct Node {
pub ipfs: Ipfs,
pub id: PeerId,
pub addrs: Vec<Multiaddr>,
}
impl Node {
pub async fn new<T: AsRef<str>>(name: T) -> Self {
Self::with_options(Some(trace_span!("ipfs", node = name.as_ref())), None).await
}
pub async fn connect<D: Into<DialOpts>>(&self, opt: D) -> Result<(), Error> {
let opts = opt.into();
if let Some(peer_id) = opts.get_peer_id() {
if self.ipfs.is_connected(peer_id).await? {
return Ok(());
}
}
self.ipfs.connect(opts).await
}
pub async fn with_options(span: Option<Span>, addr: Option<Vec<Multiaddr>>) -> Self {
let mut uninit = UninitializedIpfsNoop::new()
.with_default()
.set_transport_configuration(TransportConfig {
enable_memory_transport: true,
..Default::default()
});
if let Some(span) = span {
uninit = uninit.set_span(span);
}
let list = match addr {
Some(addr) => addr,
None => vec![Multiaddr::empty().with(Protocol::Memory(0))],
};
let ipfs = uninit.start().await.unwrap();
ipfs.dht_mode(DhtMode::Server).await.unwrap();
let id = ipfs.keypair().public().to_peer_id();
for addr in list {
ipfs.add_listening_address(addr).await.expect("To succeed");
}
let mut addrs = ipfs.listening_addresses().await.unwrap();
for addr in &mut addrs {
if let Some(proto) = addr.iter().last() {
if !matches!(proto, Protocol::P2p(_)) {
addr.push(Protocol::P2p(id));
}
}
}
Node { ipfs, id, addrs }
}
#[allow(clippy::type_complexity)]
pub fn get_subscriptions(
&self,
) -> &parking_lot::Mutex<HashMap<Cid, Vec<oneshot::Sender<Result<Block, String>>>>>
{
&self.ipfs.repo.inner.subscriptions
}
pub async fn bootstrap(&self) -> Result<(), Error> {
self.ipfs.bootstrap().await
}
pub async fn add_node(&self, node: &Self) -> Result<(), Error> {
for addr in &node.addrs {
self.add_peer(node.id, addr.to_owned()).await?;
}
Ok(())
}
pub async fn shutdown(self) {
self.ipfs.exit_daemon().await;
}
}
impl Deref for Node {
type Target = Ipfs;
fn deref(&self) -> &Self::Target {
&self.ipfs
}
}
impl DerefMut for Node {
fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
&mut self.ipfs
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use libipld::{
ipld,
multihash::{Code, MultihashDigest},
IpldCodec,
};
#[tokio::test]
async fn test_put_and_get_block() {
let ipfs = Node::new("test_node").await;
let data = b"hello block\n".to_vec();
let cid = Cid::new_v1(IpldCodec::Raw.into(), Code::Sha2_256.digest(&data));
let block = Block::new(cid, data).unwrap();
let cid: Cid = ipfs.put_block(block.clone()).await.unwrap();
let new_block = ipfs.get_block(&cid).await.unwrap();
assert_eq!(block, new_block);
}
#[tokio::test]
async fn test_put_and_get_dag() {
let ipfs = Node::new("test_node").await;
let data = ipld!([-1, -2, -3]);
let cid = ipfs.put_dag(data.clone()).await.unwrap();
let new_data = ipfs.get_dag(cid).await.unwrap();
assert_eq!(data, new_data);
}
#[tokio::test]
async fn test_pin_and_unpin() {
let ipfs = Node::new("test_node").await;
let data = ipld!([-1, -2, -3]);
let cid = ipfs.put_dag(data.clone()).pin(false).await.unwrap();
assert!(ipfs.is_pinned(&cid).await.unwrap());
ipfs.remove_pin(&cid).await.unwrap();
assert!(!ipfs.is_pinned(&cid).await.unwrap());
}
}