#[macro_use]
extern crate tracing;
pub mod block;
pub mod builder;
pub mod config;
mod context;
pub mod dag;
pub mod error;
pub mod ipns;
mod keystore;
pub mod p2p;
pub mod path;
pub mod refs;
pub mod repo;
pub mod unixfs;
pub use block::Block;
use anyhow::anyhow;
use bytes::Bytes;
use dag::{DagGet, DagPut};
use futures::{
channel::oneshot::{self, channel as oneshot_channel, Sender as OneshotSender},
future::BoxFuture,
stream::BoxStream,
StreamExt,
};
use keystore::Keystore;
use p2p::{MultiaddrExt, PeerInfo};
use repo::{DefaultStorage, RepoFetch, RepoInsertPin, RepoRemovePin};
use tracing::Span;
use tracing_futures::Instrument;
use unixfs::UnixfsGet;
use unixfs::{AddOpt, IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsLs};
use self::{dag::IpldDag, ipns::Ipns, p2p::TSwarm, repo::Repo};
pub use self::{
error::Error,
p2p::BehaviourEvent,
p2p::KadResult,
path::IpfsPath,
repo::{PinKind, PinMode},
};
use async_rt::AbortableJoinHandle;
use connexa::handle::Connexa;
pub use connexa::prelude::dht::{Mode, Quorum, Record, RecordKey, ToRecordKey};
pub use connexa::prelude::request_response::{
InboundRequestId, IntoRequest, OptionalStreamProtocol,
};
pub use connexa::prelude::swarm::derive_prelude::{ConnectionId, ListenerId};
pub use connexa::prelude::swarm::dial_opts::{DialOpts, PeerCondition};
pub use connexa::prelude::{
connection_limits::ConnectionLimits,
gossipsub, identify, ping,
swarm::{self, NetworkBehaviour},
GossipsubMessage, Stream,
};
pub use connexa::prelude::{
identity::Keypair, ConnectionEvent, Multiaddr, PeerId, Protocol, StreamProtocol,
};
pub use connexa::{behaviour::request_response::RequestResponseConfig, dummy};
use ipld_core::cid::Cid;
use ipld_core::ipld::Ipld;
use connexa::prelude::gossipsub::IntoGossipsubTopic;
use connexa::prelude::rendezvous::IntoNamespace;
#[cfg(feature = "stream")]
use connexa::prelude::stream::IntoStreamProtocol;
pub use connexa::prelude::transport::ConnectedPoint;
use serde::Serialize;
use std::{borrow::Borrow, path::PathBuf};
use std::{
collections::{HashMap, HashSet},
fmt,
path::Path,
sync::Arc,
time::Duration,
};
struct IpfsOptions {
pub ipfs_path: Option<PathBuf>,
#[cfg(target_arch = "wasm32")]
pub namespace: Option<Option<String>>,
pub bootstrap: Vec<Multiaddr>,
pub listening_addrs: Vec<Multiaddr>,
pub addr_config: AddressBookConfig,
pub keystore: Keystore,
pub provider: RepoProvider,
pub span: Option<Span>,
pub(crate) protocols: Libp2pProtocol,
}
#[derive(Default, Clone, Copy)]
pub(crate) struct Libp2pProtocol {
pub(crate) bitswap: bool,
pub(crate) relay: bool,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum RepoProvider {
#[default]
None,
All,
Pinned,
Roots,
}
impl Default for IpfsOptions {
fn default() -> Self {
Self {
ipfs_path: None,
#[cfg(target_arch = "wasm32")]
namespace: None,
bootstrap: Default::default(),
addr_config: Default::default(),
provider: Default::default(),
keystore: Keystore::in_memory(),
listening_addrs: vec![],
span: None,
protocols: Default::default(),
}
}
}
impl fmt::Debug for IpfsOptions {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("IpfsOptions")
.field("ipfs_path", &self.ipfs_path)
.field("bootstrap", &self.bootstrap)
.field("listening_addrs", &self.listening_addrs)
.field("span", &self.span)
.finish()
}
}
#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct Ipfs {
span: Span,
repo: Repo<DefaultStorage>,
connexa: Connexa<IpfsEvent>,
keystore: Keystore,
record_key_validator:
Arc<HashMap<String, Box<dyn Fn(&str) -> anyhow::Result<RecordKey> + Sync + Send>>>,
_gc_guard: AbortableJoinHandle<()>,
}
impl std::fmt::Debug for Ipfs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Ipfs").finish()
}
}
type Channel<T> = OneshotSender<Result<T, Error>>;
type ReceiverChannel<T> = oneshot::Receiver<Result<T, Error>>;
#[derive(Debug)]
#[allow(clippy::type_complexity)]
enum IpfsEvent {
Protocol(OneshotSender<Vec<String>>),
GetBitswapPeers(Channel<BoxFuture<'static, Vec<PeerId>>>),
WantList(Option<PeerId>, Channel<BoxFuture<'static, Vec<Cid>>>),
FindPeerIdentity(PeerId, Channel<ReceiverChannel<identify::Info>>),
AddPeer(AddPeerOpt, Channel<()>),
Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
RemovePeer(PeerId, Option<Multiaddr>, Channel<bool>),
GetBootstrappers(OneshotSender<Vec<Multiaddr>>),
AddBootstrapper(Multiaddr, Channel<Multiaddr>),
RemoveBootstrapper(Multiaddr, Channel<Multiaddr>),
ClearBootstrappers(Channel<Vec<Multiaddr>>),
DefaultBootstrap(Channel<Vec<Multiaddr>>),
AddRelay(PeerId, Multiaddr, Channel<()>),
RemoveRelay(PeerId, Multiaddr, Channel<()>),
EnableRelay(Option<PeerId>, Channel<()>),
DisableRelay(PeerId, Channel<()>),
ListRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
ListActiveRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
}
#[derive(Debug, Copy, Clone)]
pub enum DhtMode {
Auto,
Client,
Server,
}
impl From<DhtMode> for Option<Mode> {
fn from(mode: DhtMode) -> Self {
match mode {
DhtMode::Auto => None,
DhtMode::Client => Some(Mode::Client),
DhtMode::Server => Some(Mode::Server),
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum PubsubEvent {
Subscribe {
peer_id: PeerId,
topic: Option<String>,
},
Unsubscribe {
peer_id: PeerId,
topic: Option<String>,
},
}
type TSwarmEvent<C> = <TSwarm<C> as futures::Stream>::Item;
type TSwarmEventFn<C> = Arc<dyn Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send>;
#[derive(Debug, Copy, Clone)]
pub enum FDLimit {
Max,
Custom(u64),
}
#[derive(Debug, Clone)]
pub enum PeerConnectionEvents {
IncomingConnection {
connection_id: ConnectionId,
addr: Multiaddr,
},
OutgoingConnection {
connection_id: ConnectionId,
addr: Multiaddr,
},
ClosedConnection {
connection_id: ConnectionId,
},
}
impl Ipfs {
pub fn dag(&self) -> IpldDag {
IpldDag::new(self.clone())
}
pub fn repo(&self) -> &Repo<DefaultStorage> {
&self.repo
}
pub fn unixfs(&self) -> IpfsUnixfs {
IpfsUnixfs::new(self.clone())
}
pub fn ipns(&self) -> Ipns {
Ipns::new(self.clone())
}
pub fn put_block(&self, block: &Block) -> RepoPutBlock<DefaultStorage> {
self.repo.put_block(block).span(self.span.clone())
}
pub fn get_block(&self, cid: impl Borrow<Cid>) -> RepoGetBlock<DefaultStorage> {
self.repo.get_block(cid).span(self.span.clone())
}
pub async fn remove_block(
&self,
cid: impl Borrow<Cid>,
recursive: bool,
) -> Result<Vec<Cid>, Error> {
self.repo
.remove_block(cid, recursive)
.instrument(self.span.clone())
.await
}
pub async fn gc(&self) -> Result<Vec<Cid>, Error> {
let _g = self.repo.inner.gclock.write().await;
self.repo.cleanup().instrument(self.span.clone()).await
}
pub fn insert_pin(&self, cid: impl Borrow<Cid>) -> RepoInsertPin<DefaultStorage> {
self.repo().pin(cid).span(self.span.clone())
}
pub fn remove_pin(&self, cid: impl Borrow<Cid>) -> RepoRemovePin<DefaultStorage> {
self.repo().remove_pin(cid).span(self.span.clone())
}
pub async fn is_pinned(&self, cid: impl Borrow<Cid>) -> Result<bool, Error> {
let span = debug_span!(parent: &self.span, "is_pinned", cid = %cid.borrow());
self.repo.is_pinned(cid).instrument(span).await
}
pub async fn list_pins(
&self,
filter: Option<PinMode>,
) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
let span = debug_span!(parent: &self.span, "list_pins", ?filter);
self.repo.list_pins(filter).instrument(span).await
}
pub async fn query_pins(
&self,
cids: Vec<Cid>,
requirement: Option<PinMode>,
) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
let span = debug_span!(parent: &self.span, "query_pins", ids = cids.len(), ?requirement);
self.repo
.query_pins(cids, requirement)
.instrument(span)
.await
}
pub fn put_dag(&self, ipld: impl Serialize) -> DagPut {
self.dag().put_dag(ipld).span(self.span.clone())
}
pub fn get_dag(&self, path: impl Into<IpfsPath>) -> DagGet {
self.dag().get_dag(path).span(self.span.clone())
}
pub fn cat_unixfs(&self, starting_point: impl Into<unixfs::StartingPoint>) -> UnixfsCat {
self.unixfs().cat(starting_point).span(self.span.clone())
}
pub fn add_unixfs(&self, opt: impl Into<AddOpt>) -> UnixfsAdd {
self.unixfs().add(opt).span(self.span.clone())
}
pub fn get_unixfs(&self, path: impl Into<IpfsPath>, dest: impl AsRef<Path>) -> UnixfsGet {
self.unixfs().get(path, dest).span(self.span.clone())
}
pub fn ls_unixfs(&self, path: impl Into<IpfsPath>) -> UnixfsLs {
self.unixfs().ls(path).span(self.span.clone())
}
pub async fn resolve_ipns(
&self,
path: impl Borrow<IpfsPath>,
recursive: bool,
) -> Result<IpfsPath, Error> {
async move {
let ipns = self.ipns();
let mut resolved = ipns.resolve(path).await;
if recursive {
let mut seen = HashSet::with_capacity(1);
while let Ok(ref res) = resolved {
if !seen.insert(res.clone()) {
break;
}
resolved = ipns.resolve(res).await;
}
}
Ok(resolved?)
}
.instrument(self.span.clone())
.await
}
pub async fn publish_ipns(&self, path: impl Borrow<IpfsPath>) -> Result<IpfsPath, Error> {
async move {
let ipns = self.ipns();
ipns.publish(None, path, Default::default())
.await
.map_err(anyhow::Error::from)
}
.instrument(self.span.clone())
.await
}
pub async fn connect(&self, target: impl Into<DialOpts>) -> Result<ConnectionId, Error> {
self.connexa
.swarm()
.dial(target)
.await
.map_err(anyhow::Error::from)
}
pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::Addresses(tx))
.await?;
rx.await?
}
pub async fn is_connected(&self, peer_id: PeerId) -> Result<bool, Error> {
self.connexa
.swarm()
.is_connected(peer_id)
.await
.map_err(anyhow::Error::from)
}
pub async fn connected(&self) -> Result<Vec<PeerId>, Error> {
self.connexa
.swarm()
.connected_peers()
.await
.map_err(anyhow::Error::from)
}
pub async fn disconnect(&self, target: PeerId) -> Result<(), Error> {
self.connexa
.swarm()
.disconnect(target)
.await
.map_err(anyhow::Error::from)
}
pub async fn ban_peer(&self, target: PeerId) -> Result<(), Error> {
self.connexa
.blacklist()
.add(target)
.await
.map_err(anyhow::Error::from)
}
pub async fn unban_peer(&self, target: PeerId) -> Result<(), Error> {
self.connexa
.blacklist()
.remove(target)
.await
.map_err(Into::into)
}
pub async fn identity(&self, peer_id: Option<PeerId>) -> Result<PeerInfo, Error> {
async move {
match peer_id {
Some(peer_id) => {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::FindPeerIdentity(peer_id, tx))
.await?;
rx.await??.await?.map(PeerInfo::from)
}
None => {
let mut addresses = HashSet::new();
let (local_result, external_result) =
futures::join!(self.listening_addresses(), self.external_addresses());
let external: HashSet<Multiaddr> =
HashSet::from_iter(external_result.unwrap_or_default());
let local: HashSet<Multiaddr> =
HashSet::from_iter(local_result.unwrap_or_default());
addresses.extend(external.iter().cloned());
addresses.extend(local.iter().cloned());
let mut addresses = Vec::from_iter(addresses);
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::Protocol(tx))
.await?;
let protocols = rx
.await?
.iter()
.filter_map(|s| StreamProtocol::try_from_owned(s.clone()).ok())
.collect();
let public_key = self.keypair().public();
let peer_id = public_key.to_peer_id();
for addr in &mut addresses {
if !matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
addr.push(Protocol::P2p(peer_id))
}
}
let info = PeerInfo {
peer_id,
public_key,
protocol_version: String::new(), agent_version: String::new(), listen_addrs: addresses,
protocols,
observed_addr: None,
};
Ok(info)
}
}
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_subscribe(&self, topic: impl IntoGossipsubTopic) -> Result<(), Error> {
self.connexa
.gossipsub()
.subscribe(topic)
.await
.map_err(anyhow::Error::from)
}
pub async fn pubsub_listener(
&self,
topic: impl IntoGossipsubTopic,
) -> Result<BoxStream<'static, connexa::prelude::GossipsubEvent>, Error> {
let st = self
.connexa
.gossipsub()
.listener(topic)
.await
.map_err(anyhow::Error::from)?;
Ok(st)
}
pub async fn pubsub_publish(
&self,
topic: impl IntoGossipsubTopic,
data: impl Into<Bytes>,
) -> Result<(), Error> {
self.connexa
.gossipsub()
.publish(topic, data)
.await
.map_err(Into::into)
}
pub async fn pubsub_unsubscribe(&self, topic: impl IntoGossipsubTopic) -> Result<(), Error> {
self.connexa
.gossipsub()
.unsubscribe(topic)
.await
.map_err(Into::into)
}
pub async fn pubsub_peers(&self, topic: impl IntoGossipsubTopic) -> Result<Vec<PeerId>, Error> {
self.connexa
.gossipsub()
.peers(topic)
.await
.map_err(Into::into)
}
pub async fn pubsub_subscribed(&self) -> Result<Vec<String>, Error> {
unimplemented!()
}
pub async fn requests_subscribe(
&self,
protocol: impl Into<OptionalStreamProtocol>,
) -> Result<BoxStream<'static, (PeerId, InboundRequestId, Bytes)>, Error> {
self.connexa
.request_response()
.listen_for_requests(protocol)
.await
.map_err(Into::into)
}
pub async fn send_request(
&self,
peer_id: PeerId,
request: impl IntoRequest,
) -> Result<Bytes, Error> {
self.connexa
.request_response()
.send_request(peer_id, request)
.await
.map_err(Into::into)
}
pub async fn send_requests(
&self,
peers: impl IntoIterator<Item = PeerId>,
request: impl IntoRequest,
) -> Result<BoxStream<'static, (PeerId, std::io::Result<Bytes>)>, Error> {
self.connexa
.request_response()
.send_requests(peers, request)
.await
.map_err(Into::into)
}
pub async fn send_response(
&self,
peer_id: PeerId,
id: InboundRequestId,
response: impl IntoRequest,
) -> Result<(), Error> {
self.connexa
.request_response()
.send_response(peer_id, id, response)
.await
.map_err(Into::into)
}
pub async fn bitswap_wantlist(
&self,
peer: impl Into<Option<PeerId>>,
) -> Result<Vec<Cid>, Error> {
async move {
let peer = peer.into();
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::WantList(peer, tx))
.await?;
Ok(rx.await??.await)
}
.instrument(self.span.clone())
.await
}
#[cfg(feature = "stream")]
pub async fn stream_control(&self) -> Result<connexa::prelude::stream::Control, Error> {
self.connexa
.stream()
.control_handle()
.await
.map_err(Into::into)
}
#[cfg(feature = "stream")]
pub async fn new_stream(
&self,
protocol: impl IntoStreamProtocol,
) -> Result<connexa::prelude::stream::IncomingStreams, Error> {
let protocol = protocol.into_protocol()?;
self.connexa
.stream()
.new_stream(protocol)
.await
.map_err(Into::into)
}
#[cfg(feature = "stream")]
pub async fn open_stream(
&self,
peer_id: PeerId,
protocol: impl IntoStreamProtocol,
) -> Result<connexa::prelude::Stream, Error> {
self.connexa
.stream()
.open_stream(peer_id, protocol)
.await
.map_err(Into::into)
}
pub async fn refs_local(&self) -> Vec<Cid> {
self.repo
.list_blocks()
.instrument(self.span.clone())
.await
.collect::<Vec<_>>()
.await
}
pub async fn listening_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
self.connexa
.swarm()
.listening_addresses()
.await
.map_err(Into::into)
}
pub async fn external_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
self.connexa
.swarm()
.external_addresses()
.await
.map_err(Into::into)
}
pub async fn add_listening_address(&self, addr: Multiaddr) -> Result<ListenerId, Error> {
self.connexa
.swarm()
.listen_on(addr)
.await
.map_err(Into::into)
}
pub async fn get_listening_address(&self, id: ListenerId) -> Result<Vec<Multiaddr>, Error> {
self.connexa
.swarm()
.get_listening_addresses(id)
.await
.map_err(Into::into)
}
pub async fn remove_listening_address(&self, id: ListenerId) -> Result<(), Error> {
self.connexa
.swarm()
.remove_listener(id)
.await
.map_err(Into::into)
}
pub async fn add_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
self.connexa
.swarm()
.add_external_address(addr)
.await
.map_err(Into::into)
}
pub async fn remove_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
self.connexa
.swarm()
.remove_external_address(addr)
.await
.map_err(Into::into)
}
pub async fn connection_events(&self) -> Result<BoxStream<'static, ConnectionEvent>, Error> {
self.connexa.swarm().listener().await.map_err(Into::into)
}
pub async fn peer_connection_events(
&self,
target: PeerId,
) -> Result<BoxStream<'static, PeerConnectionEvents>, Error> {
let mut st = self.connexa.swarm().listener().await?;
let st = async_stream::stream! {
while let Some(event) = st.next().await {
yield match event {
ConnectionEvent::ConnectionEstablished { peer_id, connection_id, endpoint, .. } if peer_id == target => {
match endpoint {
ConnectedPoint::Listener { send_back_addr, .. } => {
PeerConnectionEvents::IncomingConnection { connection_id, addr: send_back_addr }
}
ConnectedPoint::Dialer { address, .. } => {
PeerConnectionEvents::OutgoingConnection { connection_id, addr: address }
}
}
},
ConnectionEvent::ConnectionClosed { peer_id, connection_id, .. } if peer_id == target => {
PeerConnectionEvents::ClosedConnection { connection_id }
}
_ => continue,
}
}
};
Ok(st.boxed())
}
pub async fn find_peer(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>, Error> {
self.connexa
.dht()
.find_peer(peer_id)
.await
.map_err(Into::into)
.map(|list| list.into_iter().map(|info| info.addrs).flatten().collect())
}
pub async fn get_providers(
&self,
cid: Cid,
) -> Result<BoxStream<'static, std::io::Result<HashSet<PeerId>>>, Error> {
self.dht_get_providers(cid).await
}
pub async fn dht_get_providers(
&self,
key: impl ToRecordKey,
) -> Result<BoxStream<'static, std::io::Result<HashSet<PeerId>>>, Error> {
self.connexa
.dht()
.get_providers(key)
.await
.map_err(Into::into)
}
pub async fn provide(&self, cid: Cid) -> Result<(), Error> {
if !self.repo.contains(&cid).await? {
return Err(anyhow!(
"Error: block {} not found locally, cannot provide",
cid
));
}
self.dht_provide(cid.hash().to_bytes()).await
}
pub async fn dht_provide(&self, key: impl ToRecordKey) -> Result<(), Error> {
self.connexa.dht().provide(key).await.map_err(Into::into)
}
pub fn fetch(&self, cid: &Cid) -> RepoFetch<DefaultStorage> {
self.repo.fetch(cid).span(self.span.clone())
}
pub async fn get_closest_peers(&self, peer_id: PeerId) -> Result<Vec<PeerId>, Error> {
self.connexa
.dht()
.find_peer(peer_id)
.await
.map_err(Into::into)
.map(|list| list.into_iter().map(|info| info.peer_id).collect())
}
pub async fn dht_mode(&self, mode: DhtMode) -> Result<(), Error> {
let mode = match mode {
DhtMode::Client => Some(Mode::Client),
DhtMode::Server => Some(Mode::Server),
DhtMode::Auto => None,
};
self.connexa.dht().set_mode(mode).await.map_err(Into::into)
}
pub async fn dht_get(
&self,
key: impl ToRecordKey,
) -> Result<BoxStream<'static, Record>, Error> {
let st = self.connexa.dht().get(key).await?;
let st = st
.filter_map(|result| async move { result.ok() })
.map(|record| record.record)
.boxed();
Ok(st)
}
pub async fn dht_put(
&self,
key: impl AsRef<[u8]>,
value: impl Into<Bytes>,
quorum: Quorum,
) -> Result<(), Error> {
let key = key.as_ref();
let key_str = String::from_utf8_lossy(key);
let key = if let Ok((prefix, _)) = split_dht_key(&key_str) {
if let Some(key_fn) = self.record_key_validator.get(prefix) {
key_fn(&key_str)?
} else {
RecordKey::from(key.to_vec())
}
} else {
RecordKey::from(key.to_vec())
};
self.connexa
.dht()
.put(key, value, quorum)
.await
.map_err(Into::into)
}
pub async fn add_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::AddRelay(peer_id, addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn remove_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::RemoveRelay(peer_id, addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn list_relays(&self, active: bool) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
async move {
let (tx, rx) = oneshot_channel();
match active {
true => {
self.connexa
.send_custom_event(IpfsEvent::ListActiveRelays(tx))
.await?
}
false => {
self.connexa
.send_custom_event(IpfsEvent::ListRelays(tx))
.await?
}
};
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn enable_autorelay(&self) -> Result<(), Error> {
Err(anyhow::anyhow!("Unimplemented"))
}
pub async fn disable_autorelay(&self) -> Result<(), Error> {
Err(anyhow::anyhow!("Unimplemented"))
}
pub async fn enable_relay(&self, peer_id: impl Into<Option<PeerId>>) -> Result<(), Error> {
async move {
let peer_id = peer_id.into();
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::EnableRelay(peer_id, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn disable_relay(&self, peer_id: PeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::DisableRelay(peer_id, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn rendezvous_register_namespace(
&self,
namespace: impl IntoNamespace,
ttl: impl Into<Option<u64>>,
peer_id: PeerId,
) -> Result<(), Error> {
self.connexa
.rendezvous()
.register(peer_id, namespace, ttl.into())
.await
.map_err(Into::into)
}
pub async fn rendezvous_unregister_namespace(
&self,
namespace: impl IntoNamespace,
peer_id: PeerId,
) -> Result<(), Error> {
self.connexa
.rendezvous()
.unregister(peer_id, namespace)
.await
.map_err(Into::into)
}
pub async fn rendezvous_namespace_discovery(
&self,
namespace: impl IntoNamespace,
ttl: impl Into<Option<u64>>,
peer_id: PeerId,
) -> Result<HashMap<PeerId, Vec<Multiaddr>>, Error> {
self.connexa
.rendezvous()
.discovery(peer_id, namespace, ttl.into(), None)
.await
.map(|(_, list)| HashMap::from_iter(list))
.map_err(anyhow::Error::from)
}
pub fn refs<'a, Iter>(
&'a self,
iplds: Iter,
max_depth: Option<u64>,
unique: bool,
) -> impl futures::Stream<Item = Result<refs::Edge, anyhow::Error>> + Send + 'a
where
Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
{
refs::iplds_refs(self.repo(), iplds, max_depth, unique)
}
pub async fn get_bootstraps(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::GetBootstrappers(tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn add_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::AddBootstrapper(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn remove_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::RemoveBootstrapper(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn clear_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::ClearBootstrappers(tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn default_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::DefaultBootstrap(tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn bootstrap(&self) -> Result<(), Error> {
self.connexa.dht().bootstrap().await.map_err(Into::into)
}
pub async fn add_peer(&self, opt: impl IntoAddPeerOpt) -> Result<(), Error> {
let opt: AddPeerOpt = opt.into_opt()?;
if opt.addresses().is_empty() {
anyhow::bail!("no address supplied");
}
let (tx, rx) = oneshot::channel();
self.connexa
.send_custom_event(IpfsEvent::AddPeer(opt, tx))
.await?;
rx.await??;
Ok(())
}
pub async fn remove_peer(&self, peer_id: PeerId) -> Result<bool, Error> {
let (tx, rx) = oneshot::channel();
self.connexa
.send_custom_event(IpfsEvent::RemovePeer(peer_id, None, tx))
.await?;
rx.await.map_err(anyhow::Error::from)?
}
pub async fn remove_peer_address(
&self,
peer_id: PeerId,
addr: Multiaddr,
) -> Result<bool, Error> {
let (tx, rx) = oneshot::channel();
self.connexa
.send_custom_event(IpfsEvent::RemovePeer(peer_id, Some(addr), tx))
.await?;
rx.await.map_err(anyhow::Error::from)?
}
pub async fn get_bitswap_peers(&self) -> Result<Vec<PeerId>, Error> {
let (tx, rx) = oneshot_channel();
self.connexa
.send_custom_event(IpfsEvent::GetBitswapPeers(tx))
.await?;
Ok(rx.await??.await)
}
pub fn keypair(&self) -> &Keypair {
self.connexa.keypair()
}
pub fn keystore(&self) -> &Keystore {
&self.keystore
}
pub async fn exit_daemon(self) {
self.repo.shutdown();
self.connexa.shutdown();
self._gc_guard.abort();
}
}
#[derive(Debug)]
pub struct AddPeerOpt {
peer_id: PeerId,
addresses: Vec<Multiaddr>,
condition: Option<PeerCondition>,
dial: bool,
keepalive: bool,
reconnect: Option<(Duration, u8)>,
}
impl AddPeerOpt {
pub fn with_peer_id(peer_id: PeerId) -> Self {
Self {
peer_id,
addresses: vec![],
condition: None,
dial: false,
keepalive: false,
reconnect: None,
}
}
pub fn add_address(mut self, mut addr: Multiaddr) -> Self {
if addr.is_empty() {
return self;
}
match addr.iter().last() {
Some(Protocol::P2p(peer_id)) if peer_id == self.peer_id => {
addr.pop();
}
Some(Protocol::P2p(_)) => return self,
_ => {}
}
if !self.addresses.contains(&addr) {
self.addresses.push(addr);
}
self
}
pub fn set_addresses(mut self, addrs: Vec<Multiaddr>) -> Self {
for addr in addrs {
self = self.add_address(addr);
}
self
}
pub fn set_peer_condition(mut self, condition: PeerCondition) -> Self {
self.condition = Some(condition);
self
}
pub fn set_dial(mut self, dial: bool) -> Self {
self.dial = dial;
self
}
pub fn set_reconnect(mut self, reconnect: impl Into<Option<(Duration, u8)>>) -> Self {
self.reconnect = reconnect.into();
self
}
pub fn reconnect(mut self, duration: Duration, interval: u8) -> Self {
self.reconnect = Some((duration, interval));
self
}
pub fn keepalive(mut self) -> Self {
self.keepalive = true;
self
}
pub fn set_keepalive(mut self, keepalive: bool) -> Self {
self.keepalive = keepalive;
self
}
}
impl AddPeerOpt {
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn addresses(&self) -> &[Multiaddr] {
&self.addresses
}
pub fn can_keep_alive(&self) -> bool {
self.keepalive
}
pub fn reconnect_opt(&self) -> Option<(Duration, u8)> {
self.reconnect
}
pub fn to_dial_opts(&self) -> Option<DialOpts> {
if !self.dial {
return None;
}
let opts = DialOpts::peer_id(self.peer_id)
.condition(self.condition.unwrap_or_default())
.build();
Some(opts)
}
}
pub trait IntoAddPeerOpt {
fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error>;
}
impl IntoAddPeerOpt for AddPeerOpt {
fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
Ok(self)
}
}
impl IntoAddPeerOpt for (PeerId, Multiaddr) {
fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
let (peer_id, addr) = self;
Ok(AddPeerOpt::with_peer_id(peer_id).add_address(addr))
}
}
impl IntoAddPeerOpt for (PeerId, Vec<Multiaddr>) {
fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
let (peer_id, addrs) = self;
Ok(AddPeerOpt::with_peer_id(peer_id).set_addresses(addrs))
}
}
impl IntoAddPeerOpt for Multiaddr {
fn into_opt(mut self) -> Result<AddPeerOpt, anyhow::Error> {
let peer_id = self
.extract_peer_id()
.ok_or(anyhow::anyhow!("address does not contain peer id"))
.map_err(std::io::Error::other)?;
Ok(AddPeerOpt::with_peer_id(peer_id).add_address(self))
}
}
#[inline]
pub(crate) fn split_dht_key(key: &str) -> anyhow::Result<(&str, &str)> {
anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
let (key, val) = {
let data = key
.split('/')
.filter(|s| !s.trim().is_empty())
.collect::<Vec<_>>();
anyhow::ensure!(
!data.is_empty() && data.len() == 2,
"split dats cannot be empty"
);
(data[0], data[1])
};
Ok((key, val))
}
#[inline]
pub(crate) fn ipns_to_dht_key<B: AsRef<str>>(key: B) -> anyhow::Result<RecordKey> {
let default_ipns_prefix = b"/ipns/";
let mut key = key.as_ref().trim().to_string();
anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
if key.starts_with('1') || key.starts_with('Q') {
key.insert(0, 'z');
}
let mut data = multibase::decode(key).map(|(_, data)| data)?;
if data[0] != 0x01 && data[1] != 0x72 {
data = [vec![0x01, 0x72], data].concat();
}
data = [default_ipns_prefix.to_vec(), data[2..].to_vec()].concat();
Ok(data.into())
}
#[inline]
pub(crate) fn to_dht_key<B: AsRef<str>, F: Fn(&str) -> anyhow::Result<RecordKey>>(
(prefix, func): (&str, F),
key: B,
) -> anyhow::Result<RecordKey> {
let key = key.as_ref().trim();
let (key, val) = split_dht_key(key)?;
anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
anyhow::ensure!(!val.is_empty(), "Value cannot be empty");
if key == prefix {
return func(val);
}
anyhow::bail!("Invalid prefix")
}
use crate::p2p::AddressBookConfig;
use crate::repo::{RepoGetBlock, RepoPutBlock};
#[cfg(all(feature = "full", not(target_arch = "wasm32")))]
#[doc(hidden)]
pub use node::Node;
#[cfg(all(feature = "full", not(target_arch = "wasm32")))]
mod node {
use super::*;
use crate::builder::DefaultIpfsBuilder;
pub struct Node {
pub ipfs: Ipfs,
pub id: PeerId,
pub addrs: Vec<Multiaddr>,
}
impl IntoAddPeerOpt for &Node {
fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
Ok(AddPeerOpt::with_peer_id(self.id).set_addresses(self.addrs.clone()))
}
}
impl Node {
pub async fn new<T: AsRef<str>>(name: T) -> Self {
Self::with_options(Some(trace_span!("ipfs", node = name.as_ref())), None).await
}
pub async fn connect(&self, opt: impl Into<DialOpts>) -> Result<(), Error> {
let opts = opt.into();
if let Some(peer_id) = opts.get_peer_id() {
if self.ipfs.is_connected(peer_id).await? {
return Ok(());
}
}
self.ipfs.connect(opts).await.map(|_| ())
}
pub async fn with_options(span: Option<Span>, addr: Option<Vec<Multiaddr>>) -> Self {
let mut uninit = DefaultIpfsBuilder::new()
.with_default()
.enable_tcp()
.enable_memory_transport()
.with_request_response(Default::default());
if let Some(span) = span {
uninit = uninit.set_span(span);
}
let list = match addr {
Some(addr) => addr,
None => vec![Multiaddr::empty().with(Protocol::Memory(0))],
};
let ipfs = uninit.start().await.unwrap();
ipfs.dht_mode(DhtMode::Server).await.unwrap();
let id = ipfs.keypair().public().to_peer_id();
for addr in list {
ipfs.add_listening_address(addr).await.expect("To succeed");
}
let mut addrs = ipfs.listening_addresses().await.unwrap();
for addr in &mut addrs {
if let Some(proto) = addr.iter().last() {
if !matches!(proto, Protocol::P2p(_)) {
addr.push(Protocol::P2p(id));
}
}
}
Node { ipfs, id, addrs }
}
#[allow(clippy::type_complexity)]
pub fn get_subscriptions(
&self,
) -> &parking_lot::Mutex<HashMap<Cid, Vec<oneshot::Sender<Result<Block, String>>>>>
{
&self.ipfs.repo.inner.subscriptions
}
pub async fn bootstrap(&self) -> Result<(), Error> {
self.ipfs.bootstrap().await
}
pub async fn add_node(&self, node: &Self) -> Result<(), Error> {
for addr in &node.addrs {
self.add_peer((node.id, addr.to_owned())).await?;
}
Ok(())
}
pub async fn shutdown(self) {
self.ipfs.exit_daemon().await;
}
}
impl std::ops::Deref for Node {
type Target = Ipfs;
fn deref(&self) -> &Self::Target {
&self.ipfs
}
}
impl std::ops::DerefMut for Node {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.ipfs
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::block::BlockCodec;
use ipld_core::ipld;
use multihash_codetable::Code;
use multihash_derive::MultihashDigest;
#[tokio::test]
async fn test_put_and_get_block() {
let ipfs = Node::new("test_node").await;
let data = b"hello block\n".to_vec();
let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
let block = Block::new(cid, data).unwrap();
let cid: Cid = ipfs.put_block(&block).await.unwrap();
let new_block = ipfs.get_block(cid).await.unwrap();
assert_eq!(block, new_block);
}
#[tokio::test]
async fn test_put_and_get_dag() {
let ipfs = Node::new("test_node").await;
let data = ipld!([-1, -2, -3]);
let cid = ipfs.put_dag(data.clone()).await.unwrap();
let new_data = ipfs.get_dag(cid).await.unwrap();
assert_eq!(data, new_data);
}
#[tokio::test]
async fn test_pin_and_unpin() {
let ipfs = Node::new("test_node").await;
let data = ipld!([-1, -2, -3]);
let cid = ipfs.put_dag(data.clone()).pin(false).await.unwrap();
assert!(ipfs.is_pinned(cid).await.unwrap());
ipfs.remove_pin(cid).await.unwrap();
assert!(!ipfs.is_pinned(cid).await.unwrap());
}
}