pub mod config;
pub mod dag;
pub mod error;
#[macro_use]
pub mod ipld;
pub mod ipns;
pub mod p2p;
pub mod path;
pub mod refs;
pub mod repo;
mod subscription;
pub mod unixfs;
#[macro_use]
extern crate tracing;
use anyhow::{anyhow, format_err};
use cid::Codec;
use either::Either;
use futures::{
channel::{
mpsc::{channel, Receiver, Sender},
oneshot::{channel as oneshot_channel, Sender as OneshotSender},
},
sink::SinkExt,
stream::{Fuse, Stream},
};
use libp2p::swarm::NetworkBehaviour;
use tracing::Span;
use tracing_futures::Instrument;
use std::{
borrow::Borrow,
collections::{HashMap, HashSet},
env, fmt,
future::Future,
ops::{Deref, DerefMut, Range},
path::PathBuf,
pin::Pin,
sync::{atomic::Ordering, Arc},
task::{Context, Poll},
};
use self::{
dag::IpldDag,
ipns::Ipns,
p2p::{
addr::{could_be_bound_from_ephemeral, starts_unspecified},
create_swarm, SwarmOptions, TSwarm,
},
repo::{create_repo, Repo, RepoEvent, RepoOptions},
subscription::SubscriptionFuture,
};
pub use self::{
error::Error,
ipld::Ipld,
p2p::{
pubsub::{PubsubMessage, SubscriptionStream},
Connection, KadResult, MultiaddrWithPeerId, MultiaddrWithoutPeerId,
},
path::IpfsPath,
repo::{PinKind, PinMode, RepoTypes},
};
pub use cid::Cid;
pub use ipfs_bitswap::Block;
pub use libp2p::{
core::{connection::ListenerId, multiaddr::Protocol, Multiaddr, PeerId, PublicKey},
identity::Keypair,
kad::{record::Key, Quorum},
};
pub trait IpfsTypes: RepoTypes {}
impl<T: RepoTypes> IpfsTypes for T {}
#[derive(Debug)]
pub struct Types;
impl RepoTypes for Types {
type TBlockStore = repo::fs::FsBlockStore;
type TDataStore = repo::fs::FsDataStore;
}
#[derive(Debug)]
pub struct TestTypes;
impl RepoTypes for TestTypes {
type TBlockStore = repo::mem::MemBlockStore;
type TDataStore = repo::mem::MemDataStore;
}
#[derive(Clone)]
pub struct IpfsOptions {
pub ipfs_path: PathBuf,
pub keypair: Keypair,
pub bootstrap: Vec<(Multiaddr, PeerId)>,
pub mdns: bool,
pub kad_protocol: Option<String>,
pub listening_addrs: Vec<Multiaddr>,
pub span: Option<Span>,
}
impl fmt::Debug for IpfsOptions {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("IpfsOptions")
.field("ipfs_path", &self.ipfs_path)
.field("bootstrap", &self.bootstrap)
.field("keypair", &DebuggableKeypair(&self.keypair))
.field("mdns", &self.mdns)
.field("kad_protocol", &self.kad_protocol)
.field("listening_addrs", &self.listening_addrs)
.field("span", &self.span)
.finish()
}
}
impl IpfsOptions {
pub fn inmemory_with_generated_keys() -> Self {
Self {
ipfs_path: env::temp_dir(),
keypair: Keypair::generate_ed25519(),
mdns: Default::default(),
bootstrap: Default::default(),
kad_protocol: Some("/ipfs/lan/kad/1.0.0".to_owned()),
listening_addrs: vec!["/ip4/127.0.0.1/tcp/0".parse().unwrap()],
span: None,
}
}
}
#[derive(Clone)]
struct DebuggableKeypair<I: Borrow<Keypair>>(I);
impl<I: Borrow<Keypair>> fmt::Debug for DebuggableKeypair<I> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = match self.get_ref() {
Keypair::Ed25519(_) => "Ed25519",
Keypair::Rsa(_) => "Rsa",
Keypair::Secp256k1(_) => "Secp256k1",
};
write!(fmt, "Keypair::{}", kind)
}
}
impl<I: Borrow<Keypair>> DebuggableKeypair<I> {
fn get_ref(&self) -> &Keypair {
self.0.borrow()
}
}
#[derive(Debug)]
pub struct Ipfs<Types: IpfsTypes> {
span: Span,
repo: Arc<Repo<Types>>,
keys: DebuggableKeypair<Keypair>,
to_task: Sender<IpfsEvent>,
}
impl<Types: IpfsTypes> Clone for Ipfs<Types> {
fn clone(&self) -> Self {
Ipfs {
span: self.span.clone(),
repo: Arc::clone(&self.repo),
keys: self.keys.clone(),
to_task: self.to_task.clone(),
}
}
}
type Channel<T> = OneshotSender<Result<T, Error>>;
#[derive(Debug)]
enum IpfsEvent {
Connect(
MultiaddrWithPeerId,
OneshotSender<Option<SubscriptionFuture<(), String>>>,
),
Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
Listeners(Channel<Vec<Multiaddr>>),
Connections(Channel<Vec<Connection>>),
Disconnect(MultiaddrWithPeerId, Channel<()>),
GetAddresses(OneshotSender<Vec<Multiaddr>>),
PubsubSubscribe(String, OneshotSender<Option<SubscriptionStream>>),
PubsubUnsubscribe(String, OneshotSender<bool>),
PubsubPublish(String, Vec<u8>, OneshotSender<()>),
PubsubPeers(Option<String>, OneshotSender<Vec<PeerId>>),
PubsubSubscribed(OneshotSender<Vec<String>>),
WantList(
Option<PeerId>,
OneshotSender<Vec<(Cid, ipfs_bitswap::Priority)>>,
),
BitswapStats(OneshotSender<BitswapStats>),
AddListeningAddress(Multiaddr, Channel<Multiaddr>),
RemoveListeningAddress(Multiaddr, Channel<()>),
Bootstrap(Channel<SubscriptionFuture<KadResult, String>>),
AddPeer(PeerId, Multiaddr),
GetClosestPeers(PeerId, OneshotSender<SubscriptionFuture<KadResult, String>>),
GetBitswapPeers(OneshotSender<Vec<PeerId>>),
FindPeer(
PeerId,
bool,
OneshotSender<Either<Vec<Multiaddr>, SubscriptionFuture<KadResult, String>>>,
),
GetProviders(Cid, OneshotSender<SubscriptionFuture<KadResult, String>>),
Provide(Cid, Channel<SubscriptionFuture<KadResult, String>>),
DhtGet(
Key,
Quorum,
OneshotSender<SubscriptionFuture<KadResult, String>>,
),
DhtPut(
Key,
Vec<u8>,
Quorum,
Channel<SubscriptionFuture<KadResult, String>>,
),
GetBootstrappers(OneshotSender<Vec<Multiaddr>>),
AddBootstrapper(MultiaddrWithPeerId, Channel<Multiaddr>),
RemoveBootstrapper(MultiaddrWithPeerId, Channel<Multiaddr>),
ClearBootstrappers(OneshotSender<Vec<Multiaddr>>),
RestoreBootstrappers(Channel<Vec<Multiaddr>>),
Exit,
}
pub struct UninitializedIpfs<Types: IpfsTypes> {
repo: Arc<Repo<Types>>,
keys: Keypair,
options: IpfsOptions,
repo_events: Receiver<RepoEvent>,
}
impl<Types: IpfsTypes> UninitializedIpfs<Types> {
pub fn new(options: IpfsOptions) -> Self {
let repo_options = RepoOptions::from(&options);
let (repo, repo_events) = create_repo(repo_options);
let keys = options.keypair.clone();
UninitializedIpfs {
repo: Arc::new(repo),
keys,
options,
repo_events,
}
}
pub async fn start(self) -> Result<(Ipfs<Types>, impl Future<Output = ()>), Error> {
use futures::stream::StreamExt;
let UninitializedIpfs {
repo,
keys,
repo_events,
mut options,
} = self;
repo.init().await?;
let (to_task, receiver) = channel::<IpfsEvent>(1);
let facade_span = options
.span
.take()
.unwrap_or_else(|| tracing::trace_span!("ipfs"));
let swarm_span = tracing::trace_span!(parent: facade_span.clone(), "swarm");
let ipfs = Ipfs {
span: facade_span,
repo: repo.clone(),
keys: DebuggableKeypair(keys),
to_task,
};
let swarm_options = SwarmOptions::from(&options);
let swarm = create_swarm(swarm_options, swarm_span, repo).await?;
let IpfsOptions {
listening_addrs, ..
} = options;
let mut fut = IpfsFuture {
repo_events: repo_events.fuse(),
from_facade: receiver.fuse(),
swarm,
listening_addresses: HashMap::with_capacity(listening_addrs.len()),
};
for addr in listening_addrs.into_iter() {
fut.start_add_listener_address(addr, None);
}
Ok((ipfs, fut))
}
}
impl<Types: IpfsTypes> Ipfs<Types> {
pub fn dag(&self) -> IpldDag<Types> {
IpldDag::new(self.clone())
}
fn ipns(&self) -> Ipns<Types> {
Ipns::new(self.clone())
}
pub async fn put_block(&self, block: Block) -> Result<Cid, Error> {
self.repo
.put_block(block)
.instrument(self.span.clone())
.await
.map(|(cid, _put_status)| cid)
}
pub async fn get_block(&self, cid: &Cid) -> Result<Block, Error> {
self.repo.get_block(cid).instrument(self.span.clone()).await
}
pub async fn remove_block(&self, cid: Cid) -> Result<Cid, Error> {
self.repo
.remove_block(&cid)
.instrument(self.span.clone())
.await
}
pub async fn insert_pin(&self, cid: &Cid, recursive: bool) -> Result<(), Error> {
use futures::stream::{StreamExt, TryStreamExt};
let span = debug_span!(parent: &self.span, "insert_pin", cid = %cid, recursive);
let refs_span = debug_span!(parent: &span, "insert_pin refs");
async move {
let Block { data, .. } = self.repo.get_block(cid).await?;
if !recursive {
self.repo.insert_direct_pin(cid).await
} else {
let ipld = crate::ipld::decode_ipld(&cid, &data)?;
let st = crate::refs::IpldRefs::default()
.with_only_unique()
.refs_of_resolved(self, vec![(cid.clone(), ipld.clone())].into_iter())
.map_ok(|crate::refs::Edge { destination, .. }| destination)
.into_stream()
.instrument(refs_span)
.boxed();
self.repo.insert_recursive_pin(cid, st).await
}
}
.instrument(span)
.await
}
pub async fn remove_pin(&self, cid: &Cid, recursive: bool) -> Result<(), Error> {
use futures::stream::{StreamExt, TryStreamExt};
let span = debug_span!(parent: &self.span, "remove_pin", cid = %cid, recursive);
async move {
if !recursive {
self.repo.remove_direct_pin(cid).await
} else {
let Block { data, .. } = match self.repo.get_block_now(&cid).await? {
Some(b) => b,
None => {
return Err(anyhow::anyhow!("pinned root not found: {}", cid));
}
};
let ipld = crate::ipld::decode_ipld(&cid, &data)?;
let st = crate::refs::IpldRefs::default()
.with_only_unique()
.with_existing_blocks()
.refs_of_resolved(
self.to_owned(),
vec![(cid.clone(), ipld.clone())].into_iter(),
)
.map_ok(|crate::refs::Edge { destination, .. }| destination)
.into_stream()
.boxed();
self.repo.remove_recursive_pin(cid, st).await
}
}
.instrument(span)
.await
}
pub async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
let span = debug_span!(parent: &self.span, "is_pinned", cid = %cid);
self.repo.is_pinned(cid).instrument(span).await
}
pub async fn list_pins(
&self,
filter: Option<PinMode>,
) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
let span = debug_span!(parent: &self.span, "list_pins", ?filter);
self.repo.list_pins(filter).instrument(span).await
}
pub async fn query_pins(
&self,
cids: Vec<Cid>,
requirement: Option<PinMode>,
) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
let span = debug_span!(parent: &self.span, "query_pins", ids = cids.len(), ?requirement);
self.repo
.query_pins(cids, requirement)
.instrument(span)
.await
}
pub async fn put_dag(&self, ipld: Ipld) -> Result<Cid, Error> {
self.dag()
.put(ipld, Codec::DagCBOR)
.instrument(self.span.clone())
.await
}
pub async fn get_dag(&self, path: IpfsPath) -> Result<Ipld, Error> {
self.dag()
.get(path)
.instrument(self.span.clone())
.await
.map_err(Error::new)
}
pub async fn cat_unixfs(
&self,
starting_point: impl Into<unixfs::StartingPoint>,
range: Option<Range<u64>>,
) -> Result<
impl Stream<Item = Result<Vec<u8>, unixfs::TraversalFailed>> + Send + '_,
unixfs::TraversalFailed,
> {
let starting_point = starting_point.into();
unixfs::cat(self, starting_point, range)
.instrument(self.span.clone())
.await
}
pub async fn resolve_ipns(&self, path: &IpfsPath, recursive: bool) -> Result<IpfsPath, Error> {
async move {
let ipns = self.ipns();
let mut resolved = ipns.resolve(path).await;
if recursive {
let mut seen = HashSet::with_capacity(1);
while let Ok(ref res) = resolved {
if !seen.insert(res.clone()) {
break;
}
resolved = ipns.resolve(&res).await;
}
resolved
} else {
resolved
}
}
.instrument(self.span.clone())
.await
}
pub async fn connect(&self, target: MultiaddrWithPeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Connect(target, tx))
.await?;
let subscription = rx.await?;
if let Some(future) = subscription {
future.await.map_err(|e| anyhow!(e))
} else {
futures::future::ready(Err(anyhow!("Duplicate connection attempt"))).await
}
}
.instrument(self.span.clone())
.await
}
pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Addresses(tx)).await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn addrs_local(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Listeners(tx)).await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn peers(&self) -> Result<Vec<Connection>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Connections(tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn disconnect(&self, target: MultiaddrWithPeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Disconnect(target, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn identity(&self) -> Result<(PublicKey, Vec<Multiaddr>), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetAddresses(tx))
.await?;
let mut addresses = rx.await?;
let public_key = self.keys.get_ref().public();
let peer_id = public_key.clone().into_peer_id();
for addr in &mut addresses {
addr.push(Protocol::P2p(peer_id.clone().into()))
}
Ok((public_key, addresses))
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_subscribe(&self, topic: String) -> Result<SubscriptionStream, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubSubscribe(topic.clone(), tx))
.await?;
rx.await?
.ok_or_else(|| format_err!("already subscribed to {:?}", topic))
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_publish(&self, topic: String, data: Vec<u8>) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubPublish(topic, data, tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_unsubscribe(&self, topic: &str) -> Result<bool, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubUnsubscribe(topic.into(), tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_peers(&self, topic: Option<String>) -> Result<Vec<PeerId>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubPeers(topic, tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn pubsub_subscribed(&self) -> Result<Vec<String>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubSubscribed(tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn bitswap_wantlist(
&self,
peer: Option<PeerId>,
) -> Result<Vec<(Cid, ipfs_bitswap::Priority)>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::WantList(peer, tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn refs_local(&self) -> Result<Vec<Cid>, Error> {
self.repo.list_blocks().instrument(self.span.clone()).await
}
pub async fn bitswap_stats(&self) -> Result<BitswapStats, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::BitswapStats(tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn add_listening_address(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::AddListeningAddress(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn remove_listening_address(&self, addr: Multiaddr) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::RemoveListeningAddress(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn find_peer(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::FindPeer(peer_id.clone(), false, tx))
.await?;
match rx.await? {
Either::Left(addrs) if !addrs.is_empty() => return Ok(addrs),
Either::Left(_) => unreachable!(),
Either::Right(future) => {
future.await?;
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::FindPeer(peer_id.clone(), true, tx))
.await?;
match rx.await? {
Either::Left(addrs) if !addrs.is_empty() => return Ok(addrs),
_ => return Err(anyhow!("couldn't find peer {}", peer_id)),
}
}
}
}
.instrument(self.span.clone())
.await
}
pub async fn get_providers(&self, cid: Cid) -> Result<Vec<PeerId>, Error> {
let kad_result = async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetProviders(cid, tx))
.await?;
Ok(rx.await?).map_err(|e: String| anyhow!(e))
}
.instrument(self.span.clone())
.await?
.await;
match kad_result {
Ok(KadResult::Peers(providers)) => Ok(providers),
Ok(_) => unreachable!(),
Err(e) => Err(anyhow!(e)),
}
}
pub async fn provide(&self, cid: Cid) -> Result<(), Error> {
if self.repo.get_block_now(&cid).await?.is_none() {
return Err(anyhow!(
"Error: block {} not found locally, cannot provide",
cid
));
}
let kad_result = async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Provide(cid, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await?
.await;
match kad_result {
Ok(KadResult::Complete) => Ok(()),
Ok(_) => unreachable!(),
Err(e) => Err(anyhow!(e)),
}
}
pub async fn get_closest_peers(&self, peer_id: PeerId) -> Result<Vec<PeerId>, Error> {
let kad_result = async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetClosestPeers(peer_id, tx))
.await?;
Ok(rx.await?).map_err(|e: String| anyhow!(e))
}
.instrument(self.span.clone())
.await?
.await;
match kad_result {
Ok(KadResult::Peers(closest)) => Ok(closest),
Ok(_) => unreachable!(),
Err(e) => Err(anyhow!(e)),
}
}
pub async fn dht_get<T: Into<Key>>(
&self,
key: T,
quorum: Quorum,
) -> Result<Vec<Vec<u8>>, Error> {
let kad_result = async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::DhtGet(key.into(), quorum, tx))
.await?;
Ok(rx.await?).map_err(|e: String| anyhow!(e))
}
.instrument(self.span.clone())
.await?
.await;
match kad_result {
Ok(KadResult::Records(recs)) => {
let values = recs.into_iter().map(|rec| rec.value).collect();
Ok(values)
}
Ok(_) => unreachable!(),
Err(e) => Err(anyhow!(e)),
}
}
pub async fn dht_put<T: Into<Key>>(
&self,
key: T,
value: Vec<u8>,
quorum: Quorum,
) -> Result<(), Error> {
let kad_result = async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::DhtPut(key.into(), value, quorum, tx))
.await?;
Ok(rx.await?).map_err(|e: String| anyhow!(e))
}
.instrument(self.span.clone())
.await??
.await;
match kad_result {
Ok(KadResult::Complete) => Ok(()),
Ok(_) => unreachable!(),
Err(e) => Err(anyhow!(e)),
}
}
pub fn refs<'a, Iter>(
&'a self,
iplds: Iter,
max_depth: Option<u64>,
unique: bool,
) -> impl Stream<Item = Result<refs::Edge, ipld::BlockError>> + Send + 'a
where
Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
{
refs::iplds_refs(self, iplds, max_depth, unique)
}
pub async fn get_bootstrappers(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetBootstrappers(tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn add_bootstrapper(&self, addr: MultiaddrWithPeerId) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::AddBootstrapper(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn remove_bootstrapper(&self, addr: MultiaddrWithPeerId) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::RemoveBootstrapper(addr, tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn clear_bootstrappers(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::ClearBootstrappers(tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
pub async fn restore_bootstrappers(&self) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::RestoreBootstrappers(tx))
.await?;
rx.await?
}
.instrument(self.span.clone())
.await
}
pub async fn exit_daemon(mut self) {
self.repo.shutdown();
let _ = self.to_task.try_send(IpfsEvent::Exit);
}
}
struct IpfsFuture<Types: IpfsTypes> {
swarm: TSwarm<Types>,
repo_events: Fuse<Receiver<RepoEvent>>,
from_facade: Fuse<Receiver<IpfsEvent>>,
listening_addresses: HashMap<Multiaddr, (ListenerId, Option<Channel<Multiaddr>>)>,
}
impl<TRepoTypes: RepoTypes> IpfsFuture<TRepoTypes> {
fn complete_listening_address_adding(&mut self, addr: Multiaddr) {
let maybe_sender = match self.listening_addresses.get_mut(&addr) {
Some((_, maybe_sender)) => maybe_sender.take(),
None => {
let mut matching_keys = self
.listening_addresses
.keys()
.filter(|right| could_be_bound_from_ephemeral(0, &addr, right))
.cloned();
let first = matching_keys.next();
if let Some(first) = first {
let second = matching_keys.next();
match (first, second) {
(first, None) => {
if let Some((id, maybe_sender)) =
self.listening_addresses.remove(&first)
{
self.listening_addresses.insert(addr.clone(), (id, None));
maybe_sender
} else {
unreachable!("We found a matching ephemeral key already, it must be in the listening_addresses")
}
}
(first, Some(second)) => {
unreachable!(
"More than one matching [{}, {}] and {:?} for {}",
first,
second,
matching_keys.collect::<Vec<_>>(),
addr
);
}
}
} else {
let first = self
.listening_addresses
.iter()
.filter(|(addr, _)| starts_unspecified(addr))
.filter(|(could_have_ephemeral, _)| {
could_be_bound_from_ephemeral(1, &addr, could_have_ephemeral)
})
.filter(|(_, (_, maybe_sender))| maybe_sender.is_some())
.map(|(addr, _)| addr.to_owned())
.next();
if let Some(first) = first {
let (id, maybe_sender) = self
.listening_addresses
.remove(&first)
.expect("just filtered this key out");
self.listening_addresses.insert(addr.clone(), (id, None));
trace!("guessing the first match for {} to be {}", first, addr);
maybe_sender
} else {
None
}
}
}
};
if let Some(sender) = maybe_sender {
let _ = sender.send(Ok(addr));
}
}
fn start_add_listener_address(&mut self, addr: Multiaddr, ret: Option<Channel<Multiaddr>>) {
use libp2p::Swarm;
use std::collections::hash_map::Entry;
if starts_unspecified(&addr)
&& self
.listening_addresses
.values()
.filter(|(_, maybe_sender)| maybe_sender.is_some())
.count()
> 0
{
if let Some(sender) = ret {
let _ = sender.send(Err(format_err!("Cannot start listening to an unspecified address when there are pending specified addresses")));
}
return;
}
match self.listening_addresses.entry(addr) {
Entry::Occupied(oe) => {
if let Some(sender) = ret {
let _ = sender.send(Err(format_err!("Already adding a possibly ephemeral Multiaddr; wait for the first one to resolve before adding others: {}", oe.key())));
}
}
Entry::Vacant(ve) => match Swarm::listen_on(&mut self.swarm, ve.key().to_owned()) {
Ok(id) => {
ve.insert((id, ret));
}
Err(e) => {
if let Some(sender) = ret {
let _ = sender.send(Err(Error::from(e)));
}
}
},
}
}
}
impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
use libp2p::{swarm::SwarmEvent, Swarm};
let mut done = false;
loop {
loop {
let inner = {
let next = self.swarm.next_event();
futures::pin_mut!(next);
match next.poll(ctx) {
Poll::Ready(inner) => inner,
Poll::Pending if done => return Poll::Pending,
Poll::Pending => break,
}
};
done = false;
match inner {
SwarmEvent::NewListenAddr(addr) => {
self.complete_listening_address_adding(addr);
}
_ => trace!("{:?}", inner),
}
}
loop {
let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) {
Poll::Ready(Some(evt)) => evt,
Poll::Ready(None) => IpfsEvent::Exit,
Poll::Pending => break,
};
match inner {
IpfsEvent::Connect(target, ret) => {
ret.send(self.swarm.connect(target)).ok();
}
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm)
.cloned()
.collect::<Vec<Multiaddr>>();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections.collect())).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
let _ = ret.send(addresses);
}
IpfsEvent::PubsubSubscribe(topic, ret) => {
let _ = ret.send(self.swarm.pubsub().subscribe(topic));
}
IpfsEvent::PubsubUnsubscribe(topic, ret) => {
let _ = ret.send(self.swarm.pubsub().unsubscribe(topic));
}
IpfsEvent::PubsubPublish(topic, data, ret) => {
self.swarm.pubsub().publish(topic, data);
let _ = ret.send(());
}
IpfsEvent::PubsubPeers(Some(topic), ret) => {
let topic = libp2p::floodsub::Topic::new(topic);
let _ = ret.send(self.swarm.pubsub().subscribed_peers(&topic));
}
IpfsEvent::PubsubPeers(None, ret) => {
let _ = ret.send(self.swarm.pubsub().known_peers());
}
IpfsEvent::PubsubSubscribed(ret) => {
let _ = ret.send(self.swarm.pubsub().subscribed_topics());
}
IpfsEvent::WantList(peer, ret) => {
let list = if let Some(peer) = peer {
self.swarm
.bitswap()
.peer_wantlist(&peer)
.unwrap_or_default()
} else {
self.swarm.bitswap().local_wantlist()
};
let _ = ret.send(list);
}
IpfsEvent::BitswapStats(ret) => {
let stats = self.swarm.bitswap().stats();
let peers = self.swarm.bitswap().peers();
let wantlist = self.swarm.bitswap().local_wantlist();
let _ = ret.send((stats, peers, wantlist).into());
}
IpfsEvent::AddListeningAddress(addr, ret) => {
self.start_add_listener_address(addr, Some(ret));
}
IpfsEvent::RemoveListeningAddress(addr, ret) => {
let removed = if let Some((id, _)) = self.listening_addresses.remove(&addr)
{
Swarm::remove_listener(&mut self.swarm, id).map_err(|_: ()| {
format_err!(
"Failed to remove previously added listening address: {}",
addr
)
})
} else {
Err(format_err!("Address was not listened to before: {}", addr))
};
let _ = ret.send(removed);
}
IpfsEvent::Bootstrap(ret) => {
let future = self.swarm.bootstrap();
let _ = ret.send(future);
}
IpfsEvent::AddPeer(peer_id, addr) => {
self.swarm.add_peer(peer_id, addr);
}
IpfsEvent::GetClosestPeers(peer_id, ret) => {
let future = self.swarm.get_closest_peers(peer_id);
let _ = ret.send(future);
}
IpfsEvent::GetBitswapPeers(ret) => {
let peers = self
.swarm
.bitswap()
.connected_peers
.keys()
.cloned()
.collect();
let _ = ret.send(peers);
}
IpfsEvent::FindPeer(peer_id, local_only, ret) => {
let swarm_addrs = self.swarm.swarm.addresses_of_peer(&peer_id);
let locally_known_addrs = if !swarm_addrs.is_empty() {
swarm_addrs
} else {
self.swarm.kademlia().addresses_of_peer(&peer_id)
};
let addrs = if !locally_known_addrs.is_empty() || local_only {
Either::Left(locally_known_addrs)
} else {
Either::Right(self.swarm.get_closest_peers(peer_id))
};
let _ = ret.send(addrs);
}
IpfsEvent::GetProviders(cid, ret) => {
let future = self.swarm.get_providers(cid);
let _ = ret.send(future);
}
IpfsEvent::Provide(cid, ret) => {
let _ = ret.send(self.swarm.start_providing(cid));
}
IpfsEvent::DhtGet(key, quorum, ret) => {
let future = self.swarm.dht_get(key, quorum);
let _ = ret.send(future);
}
IpfsEvent::DhtPut(key, value, quorum, ret) => {
let future = self.swarm.dht_put(key, value, quorum);
let _ = ret.send(future);
}
IpfsEvent::GetBootstrappers(ret) => {
let list = self.swarm.get_bootstrappers();
let _ = ret.send(list);
}
IpfsEvent::AddBootstrapper(addr, ret) => {
let result = self.swarm.add_bootstrapper(addr);
let _ = ret.send(result);
}
IpfsEvent::RemoveBootstrapper(addr, ret) => {
let result = self.swarm.remove_bootstrapper(addr);
let _ = ret.send(result);
}
IpfsEvent::ClearBootstrappers(ret) => {
let list = self.swarm.clear_bootstrappers();
let _ = ret.send(list);
}
IpfsEvent::RestoreBootstrappers(ret) => {
let list = self.swarm.restore_bootstrappers();
let _ = ret.send(list);
}
IpfsEvent::Exit => {
return Poll::Ready(());
}
}
}
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::UnwantBlock(cid) => self.swarm.bitswap().cancel_block(&cid),
RepoEvent::NewBlock(cid, ret) => {
self.swarm.bitswap().cancel_block(&cid);
if false {
let _ = ret.send(self.swarm.start_providing(cid));
} else {
let _ = ret.send(Err(anyhow!("not actively providing blocks yet")));
}
}
RepoEvent::RemovedBlock(cid) => self.swarm.stop_providing_block(&cid),
}
}
done = true;
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct BitswapStats {
pub blocks_sent: u64,
pub data_sent: u64,
pub blocks_received: u64,
pub data_received: u64,
pub dup_blks_received: u64,
pub dup_data_received: u64,
pub peers: Vec<PeerId>,
pub wantlist: Vec<(Cid, ipfs_bitswap::Priority)>,
}
impl
From<(
ipfs_bitswap::Stats,
Vec<PeerId>,
Vec<(Cid, ipfs_bitswap::Priority)>,
)> for BitswapStats
{
fn from(
(stats, peers, wantlist): (
ipfs_bitswap::Stats,
Vec<PeerId>,
Vec<(Cid, ipfs_bitswap::Priority)>,
),
) -> Self {
BitswapStats {
blocks_sent: stats.sent_blocks.load(Ordering::Relaxed),
data_sent: stats.sent_data.load(Ordering::Relaxed),
blocks_received: stats.received_blocks.load(Ordering::Relaxed),
data_received: stats.received_data.load(Ordering::Relaxed),
dup_blks_received: stats.duplicate_blocks.load(Ordering::Relaxed),
dup_data_received: stats.duplicate_data.load(Ordering::Relaxed),
peers,
wantlist,
}
}
}
#[doc(hidden)]
pub use node::Node;
mod node {
use super::*;
use std::convert::TryFrom;
pub struct Node {
pub ipfs: Ipfs<TestTypes>,
pub id: PeerId,
pub addrs: Vec<Multiaddr>,
pub bg_task: tokio::task::JoinHandle<()>,
}
impl Node {
pub async fn new<T: AsRef<str>>(name: T) -> Self {
let mut opts = IpfsOptions::inmemory_with_generated_keys();
opts.span = Some(trace_span!("ipfs", node = name.as_ref()));
Self::with_options(opts).await
}
pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> {
let addr = MultiaddrWithPeerId::try_from(addr).unwrap();
self.ipfs.connect(addr).await
}
pub async fn with_options(opts: IpfsOptions) -> Self {
let id = opts.keypair.public().into_peer_id();
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts)
.start()
.in_current_span()
.await
.unwrap();
let bg_task = tokio::task::spawn(fut.in_current_span());
let addrs = ipfs.identity().await.unwrap().1;
Node {
ipfs,
id,
addrs,
bg_task,
}
}
pub fn get_subscriptions(
&self,
) -> &std::sync::Mutex<subscription::Subscriptions<Block, String>> {
&self.ipfs.repo.subscriptions.subscriptions
}
pub async fn bootstrap(&self) -> Result<KadResult, Error> {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?;
rx.await??.await.map_err(|e| anyhow!(e))
}
pub async fn add_peer(&self, peer_id: PeerId, mut addr: Multiaddr) -> Result<(), Error> {
if matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
addr.pop();
}
self.to_task
.clone()
.send(IpfsEvent::AddPeer(peer_id, addr))
.await?;
Ok(())
}
pub async fn get_bitswap_peers(&self) -> Result<Vec<PeerId>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetBitswapPeers(tx))
.await?;
rx.await.map_err(|e| anyhow!(e))
}
pub async fn shutdown(self) {
self.ipfs.exit_daemon().await;
let _ = self.bg_task.await;
}
}
impl Deref for Node {
type Target = Ipfs<TestTypes>;
fn deref(&self) -> &Self::Target {
&self.ipfs
}
}
impl DerefMut for Node {
fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
&mut self.ipfs
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::make_ipld;
use multihash::Sha2_256;
#[tokio::test(max_threads = 1)]
async fn test_put_and_get_block() {
let ipfs = Node::new("test_node").await;
let data = b"hello block\n".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let block = Block::new(data, cid);
let cid: Cid = ipfs.put_block(block.clone()).await.unwrap();
let new_block = ipfs.get_block(&cid).await.unwrap();
assert_eq!(block, new_block);
}
#[tokio::test(max_threads = 1)]
async fn test_put_and_get_dag() {
let ipfs = Node::new("test_node").await;
let data = make_ipld!([-1, -2, -3]);
let cid = ipfs.put_dag(data.clone()).await.unwrap();
let new_data = ipfs.get_dag(cid.into()).await.unwrap();
assert_eq!(data, new_data);
}
#[tokio::test(max_threads = 1)]
async fn test_pin_and_unpin() {
let ipfs = Node::new("test_node").await;
let data = make_ipld!([-1, -2, -3]);
let cid = ipfs.put_dag(data.clone()).await.unwrap();
ipfs.insert_pin(&cid, false).await.unwrap();
assert!(ipfs.is_pinned(&cid).await.unwrap());
ipfs.remove_pin(&cid, false).await.unwrap();
assert!(!ipfs.is_pinned(&cid).await.unwrap());
}
}