#![allow(unused)]
use anyhow::{bail, format_err, Error, Result};
use ed25519_dalek::Keypair;
use futures::future;
use itertools::Itertools;
use sn_routing::{
Config, Event, EventStream, NodeElderChange, Routing, SectionAuthorityProviderUtils,
TransportConfig, MIN_AGE,
};
use std::{
collections::{BTreeSet, HashSet},
iter,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Once,
time::Duration,
};
use tracing_subscriber::EnvFilter;
use xor_name::Prefix;
static LOG_INIT: Once = Once::new();
pub async fn create_node(mut config: Config) -> Result<(Routing, EventStream)> {
LOG_INIT.call_once(|| {
tracing_subscriber::fmt()
.pretty()
.with_env_filter(EnvFilter::from_default_env())
.with_target(false)
.with_test_writer()
.init()
});
if config.transport_config.local_ip.is_none() {
config.transport_config.local_ip = Some(Ipv4Addr::LOCALHOST.into());
}
Ok(Routing::new(config).await?)
}
pub fn config_with_contact(contact: SocketAddr) -> Config {
let mut config = Config::default();
config.transport_config.hard_coded_contacts = iter::once(contact).collect();
config
}
pub const TIMEOUT: Duration = Duration::from_secs(60);
#[macro_export]
macro_rules! assert_next_event {
($event_stream:expr, $pattern:pat $(if $cond:expr)?) => {
match tokio::time::timeout($crate::utils::TIMEOUT, $event_stream.next()).await {
Ok(Some($pattern)) $(if $cond)? => {}
Ok(other) => panic!("Expecting {}, got {:?}", stringify!($pattern), other),
Err(_) => panic!("Timeout when expecting {}", stringify!($pattern)),
}
};
}
#[macro_export]
macro_rules! assert_event {
($event_stream:expr, $pattern:pat $(if $cond:expr)?) => {
loop {
match tokio::time::timeout($crate::utils::TIMEOUT, $event_stream.next()).await {
Ok(Some($pattern)) $(if $cond)? => break,
Ok(other) => tracing::trace!("Received {:?}", other),
Err(_) => panic!("Timeout when expecting {}", stringify!($pattern)),
}
}
};
}
pub async fn create_connected_nodes(count: usize) -> Result<Vec<(Routing, EventStream)>> {
let mut nodes = vec![];
let (node, mut event_stream) = create_node(Config {
first: true,
..Default::default()
})
.await?;
assert_next_event!(event_stream, Event::EldersChanged { .. });
let bootstrap_contact = node.our_connection_info();
nodes.push((node, event_stream));
let other_nodes = (1..count).map(|_| create_node(config_with_contact(bootstrap_contact)));
for node in future::try_join_all(other_nodes).await? {
nodes.push(node);
}
let mut joined_count = 1;
while let Some(event) = nodes[0].1.next().await {
if let Event::MemberJoined { name, .. } = event {
joined_count += 1
}
if joined_count == nodes.len() {
break;
}
}
assert_eq!(joined_count, nodes.len());
Ok(nodes)
}
pub async fn verify_invariants_for_node(node: &Routing, elder_size: usize) -> Result<()> {
let our_name = node.name().await;
assert!(node.matches_our_prefix(&our_name).await);
let our_prefix = node.our_prefix().await;
let our_section_elders: BTreeSet<_> = node.our_section().await.names();
if !our_prefix.is_empty() {
assert!(
our_section_elders.len() >= elder_size,
"{}({:b}) Our section is below the minimum size ({}/{})",
our_name,
our_prefix,
our_section_elders.len(),
elder_size,
);
}
if let Some(name) = our_section_elders
.iter()
.find(|name| !our_prefix.matches(name))
{
bail!(
"{}({:b}) A name in our section doesn't match its prefix: {}",
our_name,
our_prefix,
name,
);
}
if !node.is_elder().await {
return Ok(());
}
let other_sections = node.other_sections().await;
if let Some(compatible_prefix) = other_sections
.iter()
.map(|info| &info.prefix)
.find(|prefix| prefix.is_compatible(&our_prefix))
{
bail!(
"{}({:b}) Our prefix is compatible with one of the other prefixes: {:?} (other_sections: {:?})",
our_name,
our_prefix,
compatible_prefix,
other_sections,
);
}
if let Some(info) = other_sections
.iter()
.find(|info| info.elder_count() < elder_size)
{
bail!(
"{}({:b}) An other section {:?} is below the minimum size ({}/{}) (other_sections: {:?})",
our_name,
our_prefix,
info.prefix,
info.elder_count(),
elder_size,
other_sections,
);
}
for info in &other_sections {
if let Some(name) = info.names().iter().find(|&name| !info.prefix.matches(name)) {
bail!(
"{}({:b}) A name in a section doesn't match its prefix: {:?}, {:?}",
our_name,
our_prefix,
name,
info.prefix,
);
}
}
let address_space_covered = Prefix::default().is_covered_by(
iter::once(&our_prefix).chain(other_sections.iter().map(|info| &info.prefix)),
);
if !address_space_covered {
bail!(
"{}({:b}) Known sections don't fully cover the whole address space: {:?}",
our_name,
our_prefix,
iter::once(our_prefix)
.chain(other_sections.iter().map(|info| info.prefix))
.format(", ")
);
}
Ok(())
}