use cid::{Cid, Codec};
use ipfs::{p2p::MultiaddrWithPeerId, Block, Node};
use libp2p::{kad::Quorum, multiaddr::Protocol, Multiaddr};
use multihash::Sha2_256;
use tokio::time::timeout;
use std::{convert::TryInto, time::Duration};
mod common;
use common::{interop::ForeignNode, spawn_nodes, Topology};
fn strip_peer_id(addr: Multiaddr) -> Multiaddr {
let MultiaddrWithPeerId { multiaddr, .. } = addr.try_into().unwrap();
multiaddr.into()
}
#[tokio::test(max_threads = 1)]
async fn find_peer_local() {
let nodes = spawn_nodes(2, Topology::None).await;
nodes[0].connect(nodes[1].addrs[0].clone()).await.unwrap();
let mut found_addrs = nodes[0].find_peer(nodes[1].id.clone()).await.unwrap();
for addr in &mut found_addrs {
addr.push(Protocol::P2p(nodes[1].id.clone().into()));
assert!(nodes[1].addrs.contains(addr));
}
}
#[cfg(all(not(feature = "test_go_interop"), not(feature = "test_js_interop")))]
async fn spawn_bootstrapped_nodes(n: usize) -> (Vec<Node>, Option<ForeignNode>) {
let nodes = spawn_nodes(n, Topology::None).await;
for i in 0..n {
let (next_id, next_addr) = if i < n - 1 {
(nodes[i + 1].id.clone(), nodes[i + 1].addrs[0].clone())
} else {
(nodes[n - 2].id.clone(), nodes[n - 2].addrs[0].clone())
};
nodes[i].add_peer(next_id, next_addr).await.unwrap();
nodes[i].bootstrap().await.unwrap();
}
for node in &nodes {
assert!([1usize, 2].contains(&node.peers().await.unwrap().len()));
}
(nodes, None)
}
#[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))]
async fn spawn_bootstrapped_nodes(n: usize) -> (Vec<Node>, Option<ForeignNode>) {
let foreign_node = ForeignNode::new();
let nodes = spawn_nodes(n - 1, Topology::None).await;
for i in 0..(n - 1) {
let (next_id, next_addr) = if i == n / 2 - 1 || i == n / 2 {
println!("telling rust node {} about the foreign node", i);
(foreign_node.id.clone(), foreign_node.addrs[0].clone())
} else if i < n / 2 {
println!("telling rust node {} about rust node {}", i, i + 1);
(nodes[i + 1].id.clone(), nodes[i + 1].addrs[0].clone())
} else {
println!("telling rust node {} about rust node {}", i, i - 1);
(nodes[i - 1].id.clone(), nodes[i - 1].addrs[0].clone())
};
nodes[i].add_peer(next_id, next_addr).await.unwrap();
nodes[i].bootstrap().await.unwrap();
}
(nodes, Some(foreign_node))
}
#[tokio::test(max_threads = 1)]
async fn dht_find_peer() {
const CHAIN_LEN: usize = 10;
let (nodes, foreign_node) = spawn_bootstrapped_nodes(CHAIN_LEN).await;
let last_index = CHAIN_LEN - if foreign_node.is_none() { 1 } else { 2 };
let found_addrs = nodes[0]
.find_peer(nodes[last_index].id.clone())
.await
.unwrap();
let to_be_found = strip_peer_id(nodes[last_index].addrs[0].clone());
assert_eq!(found_addrs, vec![to_be_found]);
}
#[tokio::test(max_threads = 1)]
async fn dht_get_closest_peers() {
const CHAIN_LEN: usize = 10;
let (nodes, _foreign_node) = spawn_bootstrapped_nodes(CHAIN_LEN).await;
assert_eq!(
nodes[0]
.get_closest_peers(nodes[0].id.clone())
.await
.unwrap()
.len(),
CHAIN_LEN - 1
);
}
#[ignore = "targets an actual bootstrapper, so random failures can happen"]
#[tokio::test(max_threads = 1)]
async fn dht_popular_content_discovery() {
let peer = Node::new("a").await;
peer.restore_bootstrappers().await.unwrap();
let cid: Cid = "bafkreicncneocapbypwwe3gl47bzvr3pkpxmmobzn7zr2iaz67df4kjeiq"
.parse()
.unwrap();
assert!(timeout(Duration::from_secs(10), peer.get_block(&cid))
.await
.is_ok());
}
#[tokio::test(max_threads = 1)]
async fn dht_providing() {
const CHAIN_LEN: usize = 10;
let (nodes, foreign_node) = spawn_bootstrapped_nodes(CHAIN_LEN).await;
let last_index = CHAIN_LEN - if foreign_node.is_none() { 1 } else { 2 };
let data = b"hello block\n".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
nodes[last_index]
.put_block(Block {
cid: cid.clone(),
data,
})
.await
.unwrap();
nodes[last_index].provide(cid.clone()).await.unwrap();
assert!(nodes[0]
.get_providers(cid)
.await
.unwrap()
.contains(&nodes[last_index].id.clone()));
}
#[tokio::test(max_threads = 1)]
async fn dht_get_put() {
const CHAIN_LEN: usize = 10;
let (nodes, foreign_node) = spawn_bootstrapped_nodes(CHAIN_LEN).await;
let last_index = CHAIN_LEN - if foreign_node.is_none() { 1 } else { 2 };
let (key, value) = (b"key".to_vec(), b"value".to_vec());
let quorum = Quorum::One;
nodes[last_index]
.dht_put(key.clone(), value.clone(), quorum)
.await
.unwrap();
assert_eq!(nodes[0].dht_get(key, quorum).await.unwrap(), vec![value]);
}