mod db;
mod executor;
mod net;
#[cfg(feature = "telemetry")]
mod telemetry;
#[cfg(test)]
mod test_util;
mod variable;
pub mod config {
pub use libp2p::{
dns::{ResolverConfig, ResolverOpts},
gossipsub::GossipsubConfig,
identify::Config as IdentifyConfig,
kad::record::store::MemoryStoreConfig as KadConfig,
mdns::Config as MdnsConfig,
ping::Config as PingConfig,
};
pub use libp2p_bitswap::BitswapConfig;
pub use libp2p_broadcast::BroadcastConfig;
}
#[cfg(feature = "telemetry")]
pub use crate::telemetry::telemetry;
pub use crate::{
db::{Batch, StorageConfig, StorageService, TempPin},
executor::Executor,
net::{
AddressSource, ConnectionFailure, Direction, DnsConfig, Event, GossipEvent, ListenerEvent,
NetworkConfig, PeerInfo, Rtt, SwarmEvents, SyncEvent, SyncQuery,
},
};
pub use libipld::{store::DefaultParams, Block, Cid};
pub use libp2p::{
core::{transport::ListenerId, ConnectedPoint, Multiaddr, PeerId},
identity,
kad::{kbucket::Key as BucketKey, record::Key, PeerRecord, Quorum, Record},
multiaddr,
swarm::{AddressRecord, AddressScore},
};
use crate::net::NetworkService;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::{stream::Stream, Future};
use libipld::{
codec::References,
error::BlockNotFound,
store::{Store, StoreParams},
Ipld, Result,
};
use libp2p::identity::ed25519::{Keypair, PublicKey};
use libp2p_bitswap::BitswapStore;
use parking_lot::Mutex;
use prometheus::Registry;
use std::{collections::HashSet, path::Path, sync::Arc, time::Duration};
#[derive(Debug)]
pub struct Config {
pub storage: StorageConfig,
pub network: NetworkConfig,
}
impl Config {
pub fn new(path: &Path, keypair: Keypair) -> Self {
let sweep_interval = std::time::Duration::from_millis(10000);
let storage = StorageConfig::new(Some(path.join("blocks")), None, 0, sweep_interval);
let network = NetworkConfig::new(keypair);
Self { storage, network }
}
}
impl Default for Config {
fn default() -> Self {
Self::new(Path::new("."), Keypair::generate())
}
}
#[derive(Clone)]
pub struct Ipfs<P: StoreParams> {
storage: StorageService<P>,
network: NetworkService,
}
impl<P: StoreParams> std::fmt::Debug for Ipfs<P> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Ipfs").finish()
}
}
struct BitswapStorage<P: StoreParams>(StorageService<P>);
impl<P: StoreParams> BitswapStore for BitswapStorage<P>
where
Ipld: References<P::Codecs>,
{
type Params = P;
fn contains(&mut self, cid: &Cid) -> Result<bool> {
self.0.contains(cid)
}
fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
self.0.get(cid)
}
fn insert(&mut self, block: &Block<P>) -> Result<()> {
self.0.insert(block.clone())
}
fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>> {
self.0.missing_blocks(cid)
}
}
impl<P: StoreParams> Ipfs<P>
where
Ipld: References<P::Codecs>,
{
pub async fn new(config: Config) -> Result<Self> {
let executor = Executor::new();
Self::new0(config, executor).await
}
async fn new0(config: Config, executor: Executor) -> Result<Self> {
let storage = StorageService::open(config.storage, executor.clone())?;
let bitswap = BitswapStorage(storage.clone());
let network = NetworkService::new(config.network, bitswap, executor).await?;
Ok(Self { storage, network })
}
pub fn local_public_key(&self) -> PublicKey {
self.network.local_public_key()
}
pub fn local_peer_id(&self) -> PeerId {
self.network.local_peer_id()
}
pub fn local_node_name(&self) -> String {
self.network.local_node_name()
}
pub fn listen_on(&mut self, addr: Multiaddr) -> impl Stream<Item = ListenerEvent> {
self.network.listen_on(addr)
}
pub fn listeners(&self) -> Vec<Multiaddr> {
self.network.listeners()
}
pub fn add_external_address(&mut self, addr: Multiaddr) {
self.network.add_external_address(addr)
}
pub fn external_addresses(&self) -> Vec<AddressRecord> {
self.network.external_addresses()
}
pub fn add_address(&mut self, peer: PeerId, addr: Multiaddr) {
self.network.add_address(peer, addr)
}
pub fn remove_address(&mut self, peer: PeerId, addr: Multiaddr) {
self.network.remove_address(peer, addr)
}
pub fn prune_peers(&mut self, min_age: Duration) {
self.network.prune_peers(min_age);
}
pub fn dial(&mut self, peer: PeerId) {
self.network.dial(peer);
}
pub fn dial_address(&mut self, peer: PeerId, addr: Multiaddr) {
self.network.dial_address(peer, addr);
}
pub fn ban(&mut self, peer: PeerId) {
self.network.ban(peer)
}
pub fn unban(&mut self, peer: PeerId) {
self.network.unban(peer)
}
pub fn peers(&self) -> Vec<PeerId> {
self.network.peers()
}
pub fn connections(&self) -> Vec<(PeerId, Multiaddr, DateTime<Utc>, Direction)> {
self.network.connections()
}
pub fn is_connected(&self, peer: &PeerId) -> bool {
self.network.is_connected(peer)
}
pub fn peer_info(&self, peer: &PeerId) -> Option<PeerInfo> {
self.network.peer_info(peer)
}
pub fn bootstrap(
&mut self,
nodes: Vec<(PeerId, Multiaddr)>,
) -> impl Future<Output = Result<()>> {
self.network.bootstrap(nodes)
}
pub fn is_bootstrapped(&self) -> bool {
self.network.is_bootstrapped()
}
pub fn providers(&mut self, key: Key) -> impl Future<Output = Result<HashSet<PeerId>>> {
self.network.providers(key)
}
pub fn provide(&mut self, key: Key) -> impl Future<Output = Result<()>> {
self.network.provide(key)
}
pub fn unprovide(&mut self, key: Key) -> Result<()> {
self.network.unprovide(key)
}
pub fn get_record(
&mut self,
key: Key,
quorum: Quorum,
) -> impl Future<Output = Result<Vec<PeerRecord>>> {
self.network.get_record(key, quorum)
}
pub fn put_record(
&mut self,
record: Record,
quorum: Quorum,
) -> impl Future<Output = Result<()>> {
self.network.put_record(record, quorum)
}
pub fn remove_record(&mut self, key: Key) -> Result<()> {
self.network.remove_record(key)
}
pub fn subscribe(
&mut self,
topic: String,
) -> impl Future<Output = Result<impl Stream<Item = GossipEvent>>> {
self.network.subscribe(topic)
}
pub fn publish(&mut self, topic: String, msg: Vec<u8>) -> impl Future<Output = Result<()>> {
self.network.publish(topic, msg)
}
pub fn broadcast(&mut self, topic: String, msg: Vec<u8>) -> impl Future<Output = Result<()>> {
self.network.broadcast(topic, msg)
}
pub fn create_temp_pin(&self) -> Result<TempPin> {
self.storage.create_temp_pin()
}
pub fn temp_pin(&self, tmp: &mut TempPin, cid: &Cid) -> Result<()> {
self.storage.temp_pin(tmp, std::iter::once(*cid))
}
pub fn iter(&self) -> Result<impl Iterator<Item = Cid>> {
self.storage.iter()
}
pub fn contains(&self, cid: &Cid) -> Result<bool> {
self.storage.contains(cid)
}
pub fn get(&self, cid: &Cid) -> Result<Block<P>> {
if let Some(data) = self.storage.get(cid)? {
let block = Block::new_unchecked(*cid, data);
Ok(block)
} else {
Err(BlockNotFound(*cid).into())
}
}
pub async fn fetch(&self, cid: &Cid, providers: Vec<PeerId>) -> Result<Block<P>> {
if let Some(data) = self.storage.get(cid)? {
let block = Block::new_unchecked(*cid, data);
return Ok(block);
}
if !providers.is_empty() {
self.network.get(*cid, providers).await?.await?;
if let Some(data) = self.storage.get(cid)? {
let block = Block::new_unchecked(*cid, data);
return Ok(block);
}
tracing::error!("block evicted too soon. use a temp pin to keep the block around.");
}
Err(BlockNotFound(*cid).into())
}
pub fn insert(&self, block: Block<P>) -> Result<()> {
self.storage.insert(block)?;
Ok(())
}
pub fn evict(&self) -> impl Future<Output = Result<()>> {
self.storage.evict()
}
pub fn sync(
&self,
cid: &Cid,
providers: Vec<PeerId>,
) -> impl Future<Output = anyhow::Result<SyncQuery>> {
let missing = self.storage.missing_blocks(cid).ok().unwrap_or_default();
tracing::trace!(cid = %cid, missing = %missing.len(), "sync");
self.network.sync(*cid, providers, missing)
}
pub fn alias<T: AsRef<[u8]> + Send + Sync>(&self, alias: T, cid: Option<&Cid>) -> Result<()> {
self.storage.alias(alias.as_ref(), cid)
}
pub fn aliases(&self) -> Result<Vec<(Vec<u8>, Cid)>> {
self.storage.aliases()
}
pub fn resolve<T: AsRef<[u8]> + Send + Sync>(&self, alias: T) -> Result<Option<Cid>> {
self.storage.resolve(alias.as_ref())
}
pub fn reverse_alias(&self, cid: &Cid) -> Result<Option<HashSet<Vec<u8>>>> {
self.storage.reverse_alias(cid)
}
pub fn flush(&self) -> impl Future<Output = Result<()>> {
self.storage.flush()
}
pub fn batch_ops<R>(&self, f: impl FnOnce(&mut Batch<'_, P>) -> Result<R>) -> Result<R> {
self.storage.rw("batch_ops", f)
}
pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
self.storage.register_metrics(registry)?;
net::register_metrics(registry)?;
Ok(())
}
pub fn swarm_events(&mut self) -> impl Future<Output = Result<SwarmEvents>> {
self.network.swarm_events()
}
}
#[async_trait]
impl<P: StoreParams> Store for Ipfs<P>
where
Ipld: References<P::Codecs>,
{
type Params = P;
type TempPin = Arc<Mutex<TempPin>>;
fn create_temp_pin(&self) -> Result<Self::TempPin> {
Ok(Arc::new(Mutex::new(Ipfs::create_temp_pin(self)?)))
}
fn temp_pin(&self, tmp: &Self::TempPin, cid: &Cid) -> Result<()> {
Ipfs::temp_pin(self, &mut tmp.lock(), cid)
}
fn contains(&self, cid: &Cid) -> Result<bool> {
Ipfs::contains(self, cid)
}
fn get(&self, cid: &Cid) -> Result<Block<P>> {
Ipfs::get(self, cid)
}
fn insert(&self, block: &Block<P>) -> Result<()> {
Ipfs::insert(self, block.clone())?;
Ok(())
}
fn alias<T: AsRef<[u8]> + Send + Sync>(&self, alias: T, cid: Option<&Cid>) -> Result<()> {
Ipfs::alias(self, alias, cid)
}
fn resolve<T: AsRef<[u8]> + Send + Sync>(&self, alias: T) -> Result<Option<Cid>> {
Ipfs::resolve(self, alias)
}
fn reverse_alias(&self, cid: &Cid) -> Result<Option<Vec<Vec<u8>>>> {
Ipfs::reverse_alias(self, cid).map(|x| x.map(|x| x.into_iter().collect()))
}
async fn flush(&self) -> Result<()> {
Ipfs::flush(self).await
}
async fn fetch(&self, cid: &Cid) -> Result<Block<Self::Params>> {
Ipfs::fetch(self, cid, self.peers()).await
}
async fn sync(&self, cid: &Cid) -> Result<()> {
Ipfs::sync(self, cid, self.peers()).await?.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_std::future::timeout;
use futures::{join, stream::StreamExt};
use libipld::{
alias, cbor::DagCborCodec, ipld, multihash::Code, raw::RawCodec, store::DefaultParams,
};
use std::time::Duration;
use tempdir::TempDir;
fn tracing_try_init() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.ok();
}
async fn create_store(enable_mdns: bool) -> Result<(Ipfs<DefaultParams>, TempDir)> {
let tmp = TempDir::new("ipfs-embed")?;
let sweep_interval = Duration::from_millis(10000);
let storage = StorageConfig::new(None, None, 10, sweep_interval);
let mut network = NetworkConfig::new(Keypair::generate());
if !enable_mdns {
network.mdns = None;
}
let mut ipfs = Ipfs::new(Config { storage, network }).await?;
ipfs.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.next()
.await
.unwrap();
Ok((ipfs, tmp))
}
fn create_block(bytes: &[u8]) -> Result<Block<DefaultParams>> {
Block::encode(RawCodec, Code::Blake3_256, bytes)
}
#[async_std::test]
async fn test_local_store() -> Result<()> {
tracing_try_init();
let (store, _tmp) = create_store(false).await?;
let block = create_block(b"test_local_store")?;
let mut tmp = store.create_temp_pin()?;
store.temp_pin(&mut tmp, block.cid())?;
store.insert(block.clone())?;
let block2 = store.get(block.cid())?;
assert_eq!(block.data(), block2.data());
Ok(())
}
#[async_std::test]
#[ignore] async fn test_exchange_mdns() -> Result<()> {
tracing_try_init();
let (store1, _tmp) = create_store(true).await?;
let (store2, _tmp) = create_store(true).await?;
let block = create_block(b"test_exchange_mdns")?;
let mut tmp1 = store1.create_temp_pin()?;
store1.temp_pin(&mut tmp1, block.cid())?;
store1.insert(block.clone())?;
store1.flush().await?;
let mut tmp2 = store2.create_temp_pin()?;
store2.temp_pin(&mut tmp2, block.cid())?;
let block2 = store2
.fetch(block.cid(), vec![store1.local_peer_id()])
.await?;
assert_eq!(block.data(), block2.data());
Ok(())
}
#[async_std::test]
#[ignore] async fn test_exchange_kad() -> Result<()> {
tracing_try_init();
let (store, _tmp) = create_store(false).await?;
let (mut store1, _tmp) = create_store(false).await?;
let (mut store2, _tmp) = create_store(false).await?;
let addr = store.listeners()[0].clone();
let peer_id = store.local_peer_id();
let nodes = [(peer_id, addr)];
let b1 = store1.bootstrap(nodes[..].into());
let b2 = store2.bootstrap(nodes[..].into());
let (r1, r2) = join!(b1, b2);
r1.unwrap();
r2.unwrap();
let block = create_block(b"test_exchange_kad")?;
let key = Key::new(&block.cid().to_bytes());
let mut tmp1 = store1.create_temp_pin()?;
store1.temp_pin(&mut tmp1, block.cid())?;
store1.insert(block.clone())?;
store1.provide(key.clone()).await?;
store1.flush().await?;
let mut tmp2 = store2.create_temp_pin()?;
store2.temp_pin(&mut tmp2, block.cid())?;
let providers = store2.providers(key).await?;
let block2 = store2
.fetch(block.cid(), providers.into_iter().collect())
.await?;
assert_eq!(block.data(), block2.data());
Ok(())
}
#[async_std::test]
async fn test_provider_not_found() -> Result<()> {
tracing_try_init();
let (store1, _tmp) = create_store(true).await?;
let block = create_block(b"test_provider_not_found")?;
if store1
.fetch(block.cid(), vec![store1.local_peer_id()])
.await
.unwrap_err()
.downcast_ref::<BlockNotFound>()
.is_none()
{
panic!("expected block not found error");
}
Ok(())
}
macro_rules! assert_pinned {
($store:expr, $block:expr) => {
assert_eq!(
$store
.reverse_alias($block.cid())
.unwrap()
.map(|a| !a.is_empty()),
Some(true)
);
};
}
macro_rules! assert_unpinned {
($store:expr, $block:expr) => {
assert_eq!(
$store
.reverse_alias($block.cid())
.unwrap()
.map(|a| !a.is_empty()),
Some(false)
);
};
}
fn create_ipld_block(ipld: &Ipld) -> Result<Block<DefaultParams>> {
Block::encode(DagCborCodec, Code::Blake3_256, ipld)
}
#[async_std::test]
async fn test_sync() -> Result<()> {
tracing_try_init();
let (mut local1, _tmp) = create_store(false).await?;
let (mut local2, _tmp) = create_store(false).await?;
local1.add_address(local2.local_peer_id(), local2.listeners()[0].clone());
local2.add_address(local1.local_peer_id(), local1.listeners()[0].clone());
let a1 = create_ipld_block(&ipld!({ "a": 0 }))?;
let b1 = create_ipld_block(&ipld!({ "b": 0 }))?;
let c1 = create_ipld_block(&ipld!({ "c": [a1.cid(), b1.cid()] }))?;
let b2 = create_ipld_block(&ipld!({ "b": 1 }))?;
let c2 = create_ipld_block(&ipld!({ "c": [a1.cid(), b2.cid()] }))?;
let x = alias!(x);
local1.insert(a1.clone())?;
local1.insert(b1.clone())?;
local1.insert(c1.clone())?;
local1.alias(x, Some(c1.cid()))?;
local1.flush().await?;
assert_pinned!(&local1, &a1);
assert_pinned!(&local1, &b1);
assert_pinned!(&local1, &c1);
local2.alias(&x, Some(c1.cid()))?;
local2
.sync(c1.cid(), vec![local1.local_peer_id()])
.await?
.await?;
local2.flush().await?;
assert_pinned!(&local2, &a1);
assert_pinned!(&local2, &b1);
assert_pinned!(&local2, &c1);
local2.insert(b2.clone())?;
local2.insert(c2.clone())?;
local2.alias(x, Some(c2.cid()))?;
local2.flush().await?;
assert_pinned!(&local2, &a1);
assert_unpinned!(&local2, &b1);
assert_unpinned!(&local2, &c1);
assert_pinned!(&local2, &b2);
assert_pinned!(&local2, &c2);
local1.alias(x, Some(c2.cid()))?;
local1
.sync(c2.cid(), vec![local2.local_peer_id()])
.await?
.await?;
local1.flush().await?;
assert_pinned!(&local1, &a1);
assert_unpinned!(&local1, &b1);
assert_unpinned!(&local1, &c1);
assert_pinned!(&local1, &b2);
assert_pinned!(&local1, &c2);
local2.alias(x, None)?;
local2.flush().await?;
assert_unpinned!(&local2, &a1);
assert_unpinned!(&local2, &b1);
assert_unpinned!(&local2, &c1);
assert_unpinned!(&local2, &b2);
assert_unpinned!(&local2, &c2);
local1.alias(x, None)?;
local2.flush().await?;
assert_unpinned!(&local1, &a1);
assert_unpinned!(&local1, &b1);
assert_unpinned!(&local1, &c1);
assert_unpinned!(&local1, &b2);
assert_unpinned!(&local1, &c2);
Ok(())
}
#[async_std::test]
async fn test_dht_record() -> Result<()> {
tracing_try_init();
let mut stores = [create_store(false).await?, create_store(false).await?];
async_std::task::sleep(Duration::from_millis(100)).await;
stores[0]
.0
.bootstrap(vec![(
stores[1].0.local_peer_id(),
stores[1].0.listeners()[0].clone(),
)])
.await?;
stores[1]
.0
.bootstrap(vec![(
stores[0].0.local_peer_id(),
stores[0].0.listeners()[0].clone(),
)])
.await?;
async_std::task::sleep(Duration::from_millis(500)).await;
let key: Key = b"key".to_vec().into();
stores[0]
.0
.put_record(
Record::new(key.clone(), b"hello world".to_vec()),
Quorum::One,
)
.await?;
let records = stores[1].0.get_record(key, Quorum::One).await?;
assert_eq!(records.len(), 1);
Ok(())
}
#[async_std::test]
async fn test_gossip_and_broadcast() -> Result<()> {
tracing_try_init();
let mut stores = [
create_store(false).await?,
create_store(false).await?,
create_store(false).await?,
create_store(false).await?,
create_store(false).await?,
create_store(false).await?,
];
let mut subscriptions = vec![];
let topic = "topic".to_owned();
let others = stores
.iter()
.map(|store| {
(
store.0.local_peer_id(),
store.0.listeners().into_iter().next().unwrap(),
)
})
.collect::<Vec<_>>();
for (store, _) in &mut stores {
for (peer, addr) in &others {
if store.local_peer_id() != *peer {
store.dial_address(*peer, addr.clone());
}
}
}
async_std::task::sleep(Duration::from_millis(2500)).await;
for (store, _) in &stores {
for (peer, _) in &others {
assert!(store.is_connected(peer));
}
}
for (store, _) in &mut stores {
subscriptions.push(store.subscribe(topic.clone()).await?);
}
async_std::task::sleep(Duration::from_millis(500)).await;
stores[0]
.0
.publish(topic.clone(), b"hello gossip".to_vec())
.await
.unwrap();
for (idx, subscription) in subscriptions.iter_mut().enumerate() {
let mut expected = stores
.iter()
.enumerate()
.filter_map(|(i, s)| {
if i == idx {
None
} else {
Some(s.0.local_peer_id())
}
})
.flat_map(|p| {
vec![GossipEvent::Subscribed(p), GossipEvent::Subscribed(p)].into_iter()
})
.chain(if idx != 0 {
Box::new(std::iter::once(GossipEvent::Message(
stores[0].0.local_peer_id(),
b"hello gossip".to_vec().into(),
))) as Box<dyn Iterator<Item = GossipEvent>>
} else {
Box::new(std::iter::empty())
})
.collect::<Vec<GossipEvent>>();
while expected
.iter()
.any(|msg| matches!(msg, GossipEvent::Message(..)))
{
let ev = timeout(Duration::from_millis(100), subscription.next())
.await
.unwrap_or_else(|_| panic!("idx {} timeout waiting for {:?}", idx, expected))
.unwrap();
assert!(expected.contains(&ev), ", received {:?}", ev);
if let Some(idx) = expected.iter().position(|e| e == &ev) {
expected.remove(idx);
}
}
if idx != 0 {
assert!(
expected.len() < (stores.len() - 1) * 2,
", idx {} did not receive any Subscribed message",
idx
);
}
}
stores[0]
.0
.broadcast(topic.clone(), b"hello broadcast".to_vec())
.await
.unwrap();
for subscription in &mut subscriptions[1..] {
match subscription.next().await.unwrap() {
GossipEvent::Message(p, data) => {
assert_eq!(p, stores[0].0.local_peer_id());
assert_eq!(data[..], b"hello broadcast"[..]);
}
x => {
panic!("received unexpected message: {:?}", x);
}
}
}
let mut last_sub = subscriptions.drain(..1).next().unwrap();
drop(subscriptions);
stores[0]
.0
.broadcast(topic, b"r u still listening?".to_vec())
.await
.unwrap();
let mut expected = stores[1..]
.iter()
.map(|s| s.0.local_peer_id())
.flat_map(|p| {
vec![GossipEvent::Unsubscribed(p), GossipEvent::Unsubscribed(p)].into_iter()
})
.collect::<Vec<_>>();
while !expected.is_empty() {
let ev = timeout(Duration::from_millis(100), last_sub.next())
.await
.unwrap()
.unwrap();
if let GossipEvent::Subscribed(..) = ev {
continue;
}
assert!(expected.contains(&ev), ", received {:?}", ev);
if let Some(idx) = expected.iter().position(|e| e == &ev) {
expected.remove(idx);
}
}
Ok(())
}
#[async_std::test]
async fn test_batch_read() -> Result<()> {
tracing_try_init();
let network = NetworkConfig::new(Keypair::generate());
let storage = StorageConfig::new(None, None, 1000000, Duration::from_secs(3600));
let ipfs = Ipfs::<DefaultParams>::new(Config { storage, network }).await?;
let a = create_block(b"a")?;
let b = create_block(b"b")?;
ipfs.insert(a.clone())?;
ipfs.insert(b.clone())?;
let has_blocks = ipfs.batch_ops(|db| Ok(db.contains(a.cid())? && db.contains(b.cid())?))?;
assert!(has_blocks);
Ok(())
}
#[async_std::test]
async fn test_batch_write() -> Result<()> {
tracing_try_init();
let network = NetworkConfig::new(Keypair::generate());
let storage = StorageConfig::new(None, None, 1000000, Duration::from_secs(3600));
let ipfs = Ipfs::<DefaultParams>::new(Config { storage, network }).await?;
let a = create_block(b"a")?;
let b = create_block(b"b")?;
let c = create_block(b"c")?;
let d = create_block(b"d")?;
ipfs.batch_ops(|db| {
db.insert(a.clone())?;
db.insert(b.clone())?;
Ok(())
})?;
assert!(ipfs.contains(a.cid())? && ipfs.contains(b.cid())?);
#[allow(unreachable_code)]
let _: anyhow::Result<()> = ipfs.batch_ops(|db| {
db.insert(c.clone())?;
anyhow::bail!("nope!");
db.insert(d.clone())?;
});
assert!(!ipfs.contains(d.cid())? && ipfs.contains(c.cid())? && ipfs.contains(b.cid())?);
Ok(())
}
#[async_std::test]
#[ignore]
async fn test_bitswap_sync_chain() -> Result<()> {
use std::time::Instant;
tracing_try_init();
let (a, _tmp) = create_store(true).await?;
let (b, _tmp) = create_store(true).await?;
let root = alias!(root);
let (cid, blocks) = test_util::build_tree(1, 1000)?;
a.alias(root, Some(&cid))?;
b.alias(root, Some(&cid))?;
let size: usize = blocks.iter().map(|block| block.data().len()).sum();
tracing::info!("chain built {} blocks, {} bytes", blocks.len(), size);
for block in blocks.iter() {
a.insert(block.clone())?;
}
a.flush().await?;
let t0 = Instant::now();
b.sync(&cid, vec![a.local_peer_id()])
.await?
.for_each(|x| async move { tracing::debug!("sync progress {:?}", x) })
.await;
b.flush().await?;
tracing::info!(
"chain sync complete {} ms {} blocks {} bytes!",
t0.elapsed().as_millis(),
blocks.len(),
size
);
for block in blocks {
let data = b.get(block.cid())?;
assert_eq!(data, block);
}
Ok(())
}
#[async_std::test]
#[ignore]
async fn test_bitswap_sync_tree() -> Result<()> {
use std::time::Instant;
tracing_try_init();
let (a, _tmp) = create_store(true).await?;
let (b, _tmp) = create_store(true).await?;
let root = alias!(root);
let (cid, blocks) = test_util::build_tree(10, 4)?;
a.alias(root, Some(&cid))?;
b.alias(root, Some(&cid))?;
let size: usize = blocks.iter().map(|block| block.data().len()).sum();
tracing::info!("chain built {} blocks, {} bytes", blocks.len(), size);
for block in blocks.iter() {
a.insert(block.clone())?;
}
a.flush().await?;
let t0 = Instant::now();
b.sync(&cid, vec![a.local_peer_id()])
.await?
.for_each(|x| async move { tracing::debug!("sync progress {:?}", x) })
.await;
b.flush().await?;
tracing::info!(
"tree sync complete {} ms {} blocks {} bytes!",
t0.elapsed().as_millis(),
blocks.len(),
size
);
for block in blocks {
let data = b.get(block.cid())?;
assert_eq!(data, block);
}
Ok(())
}
}