use std::{
path::PathBuf,
sync::{Arc, Mutex},
};
use iroh::{
Endpoint, SecretKey, Watcher, discovery::static_provider::StaticProvider, endpoint::BindError,
protocol::Router,
};
use iroh_blobs::util::connection_pool::ConnectionPool;
use rand::{Rng, rngs::StdRng, seq::SliceRandom};
use testresult::TestResult;
use textplots::{Chart, Plot, Shape};
use super::*;
use crate::{pool::IrohPool, rpc::Blake3Immutable};
#[derive(Debug, Clone)]
struct TestPool {
clients: Arc<Mutex<BTreeMap<NodeId, RpcClient>>>,
node_id: NodeId,
}
impl ClientPool for TestPool {
async fn client(&self, id: NodeId) -> Result<RpcClient, String> {
let client = self
.clients
.lock()
.unwrap()
.get(&id)
.cloned()
.ok_or_else(|| format!("client not found: {id}"))?;
Ok(client)
}
fn id(&self) -> NodeId {
self.node_id
}
}
fn expected_ids(ids: &[NodeId], key: Id, n: usize) -> Vec<NodeId> {
let mut expected = ids
.iter()
.cloned()
.map(|id| (Distance::between(id.as_bytes(), &key), id))
.collect::<Vec<_>>();
expected.sort_unstable();
expected.dedup();
expected.truncate(n);
expected.into_iter().map(|(_, id)| id).collect()
}
type Nodes = Vec<(NodeId, (RpcClient, ApiClient))>;
fn rng(seed: u64) -> StdRng {
let mut expanded = [0; 32];
expanded[..8].copy_from_slice(&seed.to_le_bytes());
StdRng::from_seed(expanded)
}
fn apply_selection(this: usize, ids: &[NodeId], selection: &[usize]) -> Vec<NodeId> {
let mut res = Vec::new();
for i in selection {
if *i == this {
continue;
}
let offset = i % ids.len();
let id = ids[offset];
if !res.contains(&id) {
res.push(id);
}
}
res
}
type Clients = Arc<Mutex<BTreeMap<NodeId, RpcClient>>>;
async fn create_nodes(
ids: &[NodeId],
select_bootstrap: impl Fn(usize) -> Vec<usize>,
config: Config,
) -> Nodes {
create_nodes_and_clients(ids, select_bootstrap, config)
.await
.0
}
async fn create_nodes_and_clients(
ids: &[NodeId],
select_bootstrap: impl Fn(usize) -> Vec<usize>,
config: Config,
) -> (Nodes, Clients) {
let clients = Arc::new(Mutex::new(BTreeMap::new()));
let nodes = ids
.iter()
.enumerate()
.map(|(offfset, id)| {
let pool = TestPool {
clients: clients.clone(),
node_id: *id,
};
let bootstrap = apply_selection(offfset, ids, &select_bootstrap(offfset));
(
*id,
create_node_impl(*id, pool, bootstrap, None, config.clone()),
)
})
.collect::<Vec<_>>();
clients
.lock()
.unwrap()
.extend(nodes.iter().map(|(id, (rpc, _))| (*id, rpc.clone())));
(nodes, clients)
}
async fn init_routing_tables(nodes: &Nodes, ids: &[NodeId], seed: Option<u64>) -> irpc::Result<()> {
let mut rng = seed.map(rng);
let ids = ids.to_vec();
stream::iter(nodes.iter().enumerate())
.for_each_concurrent(4096, |(index, (_, (_, api)))| {
if ids.len() > 10000 {
println!("{index}");
}
let mut ids = ids.clone();
if let Some(rng) = &mut rng {
ids.shuffle(rng);
}
async move {
api.nodes_seen(&ids).await.ok();
}
})
.await;
Ok(())
}
fn make_histogram(data: &[usize]) -> Vec<usize> {
let max = data.iter().max().cloned().unwrap_or(0);
let mut histogram = vec![0usize; max + 1];
for value in data.iter().cloned() {
histogram[value] += 1;
}
histogram
}
fn plot_console(title: &str, data: &[usize]) {
let data: Vec<(f32, f32)> = data
.iter()
.enumerate()
.map(|(items_stored, num_nodes)| (items_stored as f32, *num_nodes as f32))
.collect();
println!("{title}");
Chart::new(100, 40, 0.0, (data.len() - 1) as f32)
.lineplot(&Shape::Bars(&data))
.nice();
}
fn plot_png(title: &str, data: &[usize]) {
use plotters::prelude::*;
let data: Vec<(f32, f32)> = data
.iter()
.enumerate()
.map(|(items_stored, num_nodes)| (items_stored as f32, *num_nodes as f32))
.collect();
let filename = format!("{}.png", title.replace(' ', "_")).to_lowercase();
let path = PathBuf::from("img/").join(&filename);
std::fs::create_dir_all("img/").unwrap();
let root = BitMapBackend::new(&path, (1024, 768)).into_drawing_area();
root.fill(&WHITE).unwrap();
let max_y = data.iter().map(|&(_, y)| y).fold(0f32, f32::max);
let mut chart = ChartBuilder::on(&root)
.caption(title, ("sans-serif", 30).into_font())
.margin(10)
.x_label_area_size(30)
.y_label_area_size(30)
.build_cartesian_2d(0f32..(data.len() as f32), 0f32..max_y * 1.1)
.unwrap();
chart
.configure_mesh()
.x_labels(5)
.y_labels(5)
.draw()
.unwrap();
chart
.draw_series(
data.iter()
.zip(0..)
.map(|((x, y), _)| Rectangle::new([(*x, 0f32), (*x + 1.0, *y)], BLUE.filled())),
)
.unwrap();
root.present().unwrap();
}
fn plot(title: &str, data: &[usize]) {
plot_console(title, data);
plot_png(title, data);
}
async fn random_lookup(nodes: &Nodes, rng: &mut StdRng) -> irpc::Result<()> {
stream::iter(nodes.iter())
.for_each_concurrent(4096, |(_, (_, api))| {
let key = Id::from(rng.r#gen::<[u8; 32]>());
async move {
api.lookup(key, None).await.ok();
}
})
.await;
Ok(())
}
async fn random_lookup_n(nodes: &Nodes, n: usize, seed: u64) -> irpc::Result<()> {
let mut rng = rng(seed);
for _ in 0..n {
random_lookup(nodes, &mut rng).await?;
}
Ok(())
}
async fn store_random_values(prefix: &str, nodes: &Nodes, n: usize) -> irpc::Result<()> {
let (_, (_, api)) = nodes[nodes.len() / 2].clone();
let ids = nodes.iter().map(|(id, _)| *id).collect::<Vec<_>>();
let mut common_count = vec![0usize; n];
#[allow(clippy::needless_range_loop)]
for i in 0..n {
if nodes.len() > 10000 {
println!("Value {i}");
}
let text = format!("Item {i}");
let expected_ids = expected_ids(&ids, Id::blake3_hash(text.as_bytes()), 20);
let (hash, ids) = api.put_immutable(text.as_bytes()).await.unwrap();
let mut common = expected_ids.clone();
common.retain(|id| ids.contains(id));
common_count[i] = common.len();
let data = api.get_immutable(hash).await.unwrap();
assert_eq!(
data,
Some(text.as_bytes().to_vec()),
"Data mismatch for item {i}"
);
}
let mut storage_count = vec![0usize; nodes.len()];
let mut routing_table_size = vec![0usize; nodes.len()];
for (index, (_, (_, api))) in nodes.iter().enumerate() {
let stats = api.get_storage_stats().await?;
if !stats.is_empty() {
let n = stats
.values()
.map(|kinds| kinds.values().sum::<usize>())
.sum::<usize>();
storage_count[index] = n;
}
}
for (index, (_, (_, api))) in nodes.iter().enumerate() {
let routing_table = api.get_routing_table().await?;
let count = routing_table.iter().map(|peers| peers.len()).sum::<usize>();
routing_table_size[index] = count;
}
plot(
&format!("{prefix} - Histogram - Commonality with perfect set of 20 ids"),
&make_histogram(&common_count),
);
plot(
&format!("{prefix} - Storage usage per node"),
&storage_count,
);
plot(
&format!("{prefix} - Histogram - Storage usage per node"),
&make_histogram(&storage_count),
);
plot(
&format!("{prefix} - Routing table size per node"),
&routing_table_size,
);
plot(
&format!("{prefix} - Histogram - Routing table size per node"),
&make_histogram(&routing_table_size),
);
Ok(())
}
async fn plot_random_lookup_stats(prefix: &str, nodes: &Nodes, n: usize) -> irpc::Result<()> {
let (_, (_, api)) = nodes[nodes.len() / 2].clone();
let ids = nodes.iter().map(|(id, _)| *id).collect::<Vec<_>>();
let mut common_count = vec![0usize; n];
let mut storage_count = vec![0usize; nodes.len()];
#[allow(clippy::needless_range_loop)]
for i in 0..n {
if nodes.len() > 10000 {
println!("{i}");
}
let text = format!("Item {i}");
let id = Id::from(blake3::hash(text.as_bytes()));
let storage_ids = api.lookup(id, None).await.unwrap();
let expected_ids = expected_ids(&ids, Id::blake3_hash(text.as_bytes()), 20);
let mut common = expected_ids.clone();
common.retain(|id| storage_ids.contains(id));
common_count[i] = common.len();
for id in &storage_ids {
let idx = ids.iter().position(|x| *x == *id).unwrap();
storage_count[idx] += 1;
}
}
let mut routing_table_size = vec![0usize; nodes.len()];
for (index, (_, (_, api))) in nodes.iter().enumerate() {
let routing_table = api.get_routing_table().await?;
let count = routing_table.iter().map(|peers| peers.len()).sum::<usize>();
routing_table_size[index] = count;
}
plot(
&format!("{prefix} - Histogram - Commonality with perfect set of 20 ids"),
&make_histogram(&common_count),
);
plot(
&format!("{prefix} - Storage usage per node"),
&storage_count,
);
plot(
&format!("{prefix} - Histogram - Storage usage per node"),
&make_histogram(&storage_count),
);
plot(
&format!("{prefix} - Routing table size per node"),
&routing_table_size,
);
plot(
&format!("{prefix} - Histogram - Routing table size per node"),
&make_histogram(&routing_table_size),
);
Ok(())
}
#[allow(dead_code)]
fn create_buckets(ids: &[NodeId]) -> Box<Buckets> {
let secret = SecretKey::from_bytes(&[0; 32]);
let node_id = secret.public();
let mut routing_table = RoutingTable::new(node_id, None);
for id in ids {
routing_table.add_node(NodeInfo {
id: *id,
last_seen: now(),
});
}
routing_table.buckets
}
fn next_n(n: usize) -> impl Fn(usize) -> Vec<usize> {
move |offset| (1..=n).map(|i| offset + i).collect::<Vec<_>>()
}
#[tokio::test(flavor = "multi_thread")]
async fn no_routing_1k() {
let prefix = "no_routing_1k";
let n = 1000;
let seed = 0;
let bootstrap = next_n(0);
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let nodes = create_nodes(&ids, bootstrap, Config::default()).await;
let clients = nodes.iter().cloned().collect::<BTreeMap<_, _>>();
for i in 0..100 {
let text = format!("Item {i}");
let key = Id::blake3_hash(text.as_bytes());
for id in expected_ids(&ids, key, 20) {
let (rpc, _) = clients.get(&id).expect("Node not found");
rpc.set(
key,
Value::Blake3Immutable(Blake3Immutable {
timestamp: now(),
data: text.as_bytes().to_vec(),
}),
)
.await
.ok();
}
}
let mut storage_count = vec![0usize; nodes.len()];
for (index, (_, (_, api))) in nodes.iter().enumerate() {
let stats: BTreeMap<Id, BTreeMap<Kind, usize>> = api.get_storage_stats().await.unwrap();
if !stats.is_empty() {
let n = stats
.values()
.map(|kinds| kinds.values().sum::<usize>())
.sum::<usize>();
storage_count[index] = n;
}
}
plot(
&format!("{prefix} - Storage usage per node"),
&storage_count,
);
plot(
&format!("{prefix} -Histogram - Storage usage per node"),
&make_histogram(&storage_count),
);
}
#[tokio::test(flavor = "multi_thread")]
async fn perfect_routing_tables_1k() {
let n = 1000;
let seed = 0;
let bootstrap = next_n(0);
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let nodes = create_nodes(&ids, bootstrap, Config::default()).await;
init_routing_tables(&nodes, &ids, Some(seed)).await.ok();
store_random_values("perfect_routing_tables_1k", &nodes, 100)
.await
.ok();
}
#[tokio::test(flavor = "multi_thread")]
async fn perfect_routing_tables_10k() {
let n = 10000;
let seed = 0;
let bootstrap = next_n(0);
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let nodes = create_nodes(&ids, bootstrap, Config::default()).await;
println!("init routing tables");
init_routing_tables(&nodes, &ids, Some(seed)).await.ok();
println!("store random values");
store_random_values("perfect_routing_tables_10k", &nodes, 100)
.await
.ok();
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "runs very long and takes ~20GiB"]
async fn perfect_routing_tables_100k() {
let n = 100000;
let seed = 0;
let bootstrap = next_n(0);
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let nodes = create_nodes(&ids, bootstrap, Config::default()).await;
println!("init routing tables");
init_routing_tables(&nodes, &ids, Some(seed)).await.ok();
println!("store random values");
store_random_values("perfect_routing_tables_100k", &nodes, 100)
.await
.ok();
}
#[tokio::test(flavor = "multi_thread")]
async fn just_bootstrap_1k() {
let n = 1000;
let seed = 0;
let bootstrap = next_n(20);
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let nodes = create_nodes(&ids, bootstrap, Config::default()).await;
store_random_values("just_bootstrap_1k", &nodes, 100)
.await
.ok();
}
async fn random_lookup_test(prefix: &str, n: usize, seed: u64, lookups: usize) {
let bootstrap = next_n(20);
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let nodes = create_nodes(&ids, bootstrap, Config::default()).await;
random_lookup_n(&nodes, lookups, seed).await.ok();
store_random_values(prefix, &nodes, 100).await.ok();
}
#[tokio::test(flavor = "multi_thread")]
async fn random_lookup_1k() {
for lookups in 0..10 {
random_lookup_test(&format!("random_lookup_1k_{lookups}"), 1000, 0, lookups).await;
}
}
const DHT_TEST_ALPN: &[u8] = b"iroh/dht/test-0";
type IrohNodes = Vec<(Endpoint, (RpcClient, ApiClient))>;
async fn iroh_create_nodes(
secrets: &[SecretKey],
mut n_bootstrap: usize,
buckets: Option<Box<Buckets>>,
) -> std::result::Result<IrohNodes, BindError> {
let n = secrets.len();
let node_ids = secrets.iter().map(|s| s.public()).collect::<Vec<_>>();
let node_ids = Arc::new(node_ids);
let buckets = Arc::new(buckets);
let discovery = StaticProvider::new();
n_bootstrap = n_bootstrap.min(n - 1);
stream::iter(secrets.iter().zip(node_ids.iter()).enumerate())
.map(|(offfset, (secret, node_id))| {
let buckets = buckets.clone();
let node_ids = node_ids.clone();
let discovery = discovery.clone();
async move {
let endpoint = Endpoint::builder()
.secret_key(secret.clone())
.relay_mode(iroh::RelayMode::Disabled)
.discovery(discovery.clone())
.bind()
.await?;
let addr = endpoint.node_addr().initialized().await;
discovery.add_node_info(addr.clone());
let pool = ConnectionPool::new(
endpoint.clone(),
DHT_TEST_ALPN,
iroh_blobs::util::connection_pool::Options {
max_connections: 32,
idle_timeout: Duration::from_secs(1),
connect_timeout: Duration::from_secs(1),
on_connected: None,
},
);
let pool = IrohPool::new(endpoint.clone(), pool);
let bootstrap = (0..n_bootstrap)
.map(|i| node_ids[(offfset + i + 1) % n])
.collect::<Vec<_>>();
let (rpc, api) = create_node_impl(
*node_id,
pool.clone(),
bootstrap,
(*buckets).clone(),
Default::default(),
);
pool.set_self_client(Some(rpc.downgrade()));
Ok((endpoint, (rpc, api)))
}
})
.buffered_unordered(32)
.collect::<Vec<_>>()
.await
.into_iter()
.collect()
}
fn create_secrets(seed: u64, n: usize) -> Vec<SecretKey> {
let mut rng = rng(seed);
(0..n)
.map(|_| SecretKey::from_bytes(&rng.r#gen::<[u8; 32]>()))
.collect()
}
fn create_node_ids(secrets: &[SecretKey]) -> Vec<NodeId> {
secrets.iter().map(|s| s.public()).collect()
}
fn spawn_routers(iroh_nodes: &IrohNodes) -> Vec<Router> {
iroh_nodes
.iter()
.map(|(endpoint, (rpc, _))| {
let sender = rpc.0.as_local().unwrap();
Router::builder(endpoint.clone())
.accept(DHT_TEST_ALPN, irpc_iroh::IrohProtocol::with_sender(sender))
.spawn()
})
.collect()
}
async fn iroh_perfect_routing_tables(prefix: &str, n: usize) -> TestResult<()> {
let seed = 0;
let bootstrap = 0;
let secrets = create_secrets(seed, n);
println!("Creating {n} nodes");
let iroh_nodes = iroh_create_nodes(&secrets, bootstrap, None).await?;
let nodes = iroh_nodes
.iter()
.map(|(ep, x)| (ep.node_id(), x.clone()))
.collect::<Vec<_>>();
let ids = nodes.iter().map(|(id, _)| *id).collect::<Vec<_>>();
println!("Initializing routing tables");
init_routing_tables(&nodes, &ids, Some(seed)).await.ok();
println!("Spawning {n} routers");
let _routers = spawn_routers(&iroh_nodes);
println!("Storing random values");
store_random_values(prefix, &nodes, 100).await.ok();
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn iroh_perfect_routing_tables_500() -> TestResult<()> {
iroh_perfect_routing_tables("perfect_routing_tables_500", 500).await
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "runs very long and takes a lot of mem"]
async fn iroh_perfect_routing_tables_10k() -> TestResult<()> {
iroh_perfect_routing_tables("perfect_routing_tables_10k", 10000).await
}
#[tokio::test(flavor = "multi_thread")]
async fn random_lookup_strategy() {
let n = 1000;
let seed = 0;
let bootstrap = next_n(20);
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let config = Config::default().random_lookup_strategy(RandomLookupStrategy {
interval: Duration::from_secs(1),
blended: false,
});
let nodes = create_nodes(&ids, bootstrap, config).await;
for _i in 0..20 {
tokio::time::sleep(Duration::from_secs(1)).await;
plot_random_lookup_stats("random_lookup_strategy", &nodes, 100)
.await
.ok();
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
#[tokio::test(flavor = "multi_thread")]
async fn self_lookup_strategy() {
let n = 1000;
let seed = 0;
let bootstrap = next_n(20);
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let config = Config::default().self_lookup_strategy(SelfLookupStrategy {
interval: Duration::from_secs(1),
});
let nodes = create_nodes(&ids, bootstrap, config).await;
for i in 0..20 {
plot_random_lookup_stats(&format!("self_lookup_strategy-{i}"), &nodes, 100)
.await
.ok();
tokio::time::sleep(Duration::from_secs(1)).await;
println!();
}
}
#[tokio::test(flavor = "multi_thread")]
async fn self_and_random_lookup_strategy() {
let n = 1000;
let seed = 0;
let bootstrap = next_n(20);
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let config = Config::default()
.self_lookup_strategy(SelfLookupStrategy {
interval: Duration::from_secs(1),
})
.random_lookup_strategy(RandomLookupStrategy {
interval: Duration::from_secs(1),
blended: false,
});
let nodes = create_nodes(&ids, bootstrap, config).await;
for _i in 0..20 {
tokio::time::sleep(Duration::from_secs(1)).await;
plot_random_lookup_stats("self_and_random_lookup_strategy", &nodes, 100)
.await
.ok();
println!();
}
}
use std::{fs::File, path::Path};
use gif::{Encoder, Frame, Repeat};
struct Frames {
data: Vec<Vec<bool>>,
stride: usize,
}
impl Frames {
fn new(stride: usize) -> Self {
Frames {
data: Vec::new(),
stride,
}
}
fn height(&self) -> Option<usize> {
let first = self.data.first()?;
assert!(
self.data.iter().all(|f| f.len() == first.len()),
"All frames must have the same length"
);
assert!(self.stride > 0, "Stride must be greater than zero");
assert!(
first.len().is_multiple_of(self.stride),
"Size must be a multiple of stride"
);
Some(first.len() / self.stride)
}
fn side_by_side(frames: &[&Frames], gap: usize) -> TestResult<Frames> {
let first = frames.first().unwrap();
let height = first.height().unwrap();
let n_frames = first.data.len();
assert!(
frames.iter().all(|f| f.stride > 0),
"Stride must be greater than zero"
);
assert!(
frames.iter().all(|f| f.height() == Some(height)),
"All frames must have the same height"
);
assert!(
frames.iter().all(|f| f.data.len() == n_frames),
"All frames must have the same number of frames"
);
let width = frames.iter().map(|f| f.stride).sum::<usize>() + gap * (frames.len() - 1);
let mut res = Frames::new(width);
for n_frame in 0..n_frames {
let mut frame = Vec::new();
let mut iters = frames
.iter()
.map(|f| f.data[n_frame].chunks(f.stride))
.collect::<Vec<_>>();
for _y in 0..height {
for (i, iter) in iters.iter_mut().enumerate() {
let line = iter.next().unwrap();
if i > 0 {
frame.extend(std::iter::repeat_n(false, gap));
}
frame.extend_from_slice(line);
}
}
res.data.push(frame);
}
Ok(res)
}
fn make_gif(&self, delay_ms: u64, target: impl AsRef<Path>) -> TestResult<()> {
if self.data.is_empty() {
return Ok(());
}
let size = self.data[0].len();
assert!(self.stride > 0, "Stride must be greater than zero");
assert!(
size.is_multiple_of(self.stride),
"Height must be a multiple of stride"
);
assert!(
self.data.iter().all(|f| f.len() == size),
"All frames must have the same length"
);
if let Some(parent) = target.as_ref().parent() {
std::fs::create_dir_all(parent)?;
}
let width = self.stride;
let height = size / self.stride;
let mut output = File::create(target)?;
let palette = [0, 0, 0, 255, 255, 255]; let mut encoder = Encoder::new(&mut output, width as u16, height as u16, &palette)?;
encoder.set_repeat(Repeat::Infinite)?;
for frame_data in &self.data {
let mut pixels = Vec::with_capacity(frame_data.len());
for &pixel in frame_data {
pixels.push(if pixel { 1u8 } else { 0 }); }
let mut frame = Frame::from_indexed_pixels(width as u16, height as u16, pixels, None);
frame.delay = (delay_ms / 10) as u16;
encoder.write_frame(&frame)?;
}
Ok(())
}
}
async fn make_frame(ids: &[NodeId], nodes: &Nodes) -> TestResult<Vec<bool>> {
let mut res = Vec::new();
for (_, (_, api)) in nodes.iter() {
let routing_table = api.get_routing_table().await?;
let node_ids = routing_table
.iter()
.flat_map(|peers| peers.iter().map(|x| x.id))
.collect::<HashSet<_>>();
res.extend(ids.iter().map(|id| node_ids.contains(id)));
}
Ok(res)
}
#[tokio::test(flavor = "multi_thread")]
async fn partition_1k() -> TestResult<()> {
let n = 1000;
let k = 900;
let seed = 0;
let bootstrap = |i| {
if i < k {
(1..=20).map(|j| (i + j) % k).collect::<Vec<_>>()
} else {
vec![]
}
};
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let config = Config::persistent()
.self_lookup_strategy(SelfLookupStrategy {
interval: Duration::from_secs(1),
})
.random_lookup_strategy(RandomLookupStrategy {
interval: Duration::from_secs(1),
blended: false,
})
.candidate_lookup_strategy(CandidateLookupStrategy {
max_lookups: 1,
interval: Duration::from_secs(1),
});
let (nodes, _clients) = create_nodes_and_clients(&ids, bootstrap, config).await;
let mut frames = Vec::new();
for i in 0..10 {
tokio::time::sleep(Duration::from_secs(1)).await;
plot_random_lookup_stats(&format!("partition_1k-{i}-"), &nodes, 100)
.await
.ok();
frames.push(make_frame(&ids, &nodes).await?);
println!();
}
let id0 = nodes[0].0;
for node in &nodes[k..] {
let (_, (_, api)) = node;
api.nodes_seen(&[id0]).await.ok();
}
let mut frames = Frames::new(n);
for i in 0..30 {
tokio::time::sleep(Duration::from_secs(1)).await;
plot_random_lookup_stats(&format!("partition_1k-{}-", i + 10), &nodes, 100)
.await
.ok();
let last_id = nodes.last().unwrap().0;
let mut knows_last_id = 0;
for (_, (_, api)) in nodes.iter() {
let routing_table = api.get_routing_table().await?;
knows_last_id += routing_table
.iter()
.map(|peers| peers.iter().filter(|x| x.id == last_id).count())
.sum::<usize>();
}
frames.data.push(make_frame(&ids, &nodes).await?);
println!("Nodes that know about last_id: {knows_last_id}");
}
frames.make_gif(100, "img/partition_1k.gif")?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn remove_1k() -> TestResult<()> {
let n = 1000;
let k = 900;
let seed = 0;
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let config = Config::persistent()
.self_lookup_strategy(SelfLookupStrategy {
interval: Duration::from_secs(1),
})
.random_lookup_strategy(RandomLookupStrategy {
interval: Duration::from_secs(1),
blended: false,
})
.candidate_lookup_strategy(CandidateLookupStrategy {
max_lookups: 1,
interval: Duration::from_secs(1),
});
let (nodes, clients) = create_nodes_and_clients(&ids, next_n(20), config).await;
let mut frames = Frames {
data: Vec::new(),
stride: n,
};
for _i in 0..10 {
tokio::time::sleep(Duration::from_secs(1)).await;
frames.data.push(make_frame(&ids, &nodes).await?);
println!();
}
for id in &ids[k..] {
clients.lock().unwrap().remove(id);
}
for _i in 0..40 {
tokio::time::sleep(Duration::from_secs(1)).await;
frames.data.push(make_frame(&ids, &nodes).await?);
println!();
}
frames.make_gif(100, "img/remove_1k.gif")?;
Ok(())
}
async fn trigger_random_lookups(nodes: &Nodes) {
stream::iter(nodes.iter())
.for_each_concurrent(512, |(_, (_, api))| async move {
api.random_lookup().await;
})
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn random_vs_blended_1k() -> TestResult<()> {
let n = 1000;
let seed = 0;
let n_frames = 50;
let secrets = create_secrets(seed, n);
let ids = create_node_ids(&secrets);
let random = {
let config = Config::persistent().random_lookup_strategy(RandomLookupStrategy {
interval: Duration::MAX,
blended: false,
});
let nodes = create_nodes(&ids, next_n(20), config).await;
let mut frames = Frames {
data: Vec::new(),
stride: n,
};
for _i in 0..n_frames {
frames.data.push(make_frame(&ids, &nodes).await?);
trigger_random_lookups(&nodes).await;
println!();
}
frames
};
let blended = {
let config = Config::persistent()
.random_lookup_strategy(RandomLookupStrategy {
interval: Duration::MAX,
blended: true,
});
let nodes = create_nodes(&ids, next_n(20), config).await;
let mut frames = Frames {
data: Vec::new(),
stride: n,
};
for _i in 0..n_frames {
frames.data.push(make_frame(&ids, &nodes).await?);
trigger_random_lookups(&nodes).await;
println!();
}
frames
};
let perfect = {
let config = Config::persistent()
.random_lookup_strategy(RandomLookupStrategy {
interval: Duration::MAX,
blended: true,
});
let nodes = create_nodes(&ids, next_n(20), config).await;
init_routing_tables(&nodes, &ids, Some(seed)).await.ok();
let mut frames = Frames {
data: Vec::new(),
stride: n,
};
for _i in 0..n_frames {
frames.data.push(make_frame(&ids, &nodes).await?);
println!();
}
frames
};
let frames = Frames::side_by_side(&[&random, &blended, &perfect], 20)?;
frames.make_gif(100, "img/random_vs_blended_1k.gif")?;
Ok(())
}