#![recursion_limit = "256"]
use bls::PublicKey;
use eyre::{eyre, Context, Error, Result};
use futures::{
future,
stream::{self, StreamExt},
Stream,
};
use itertools::Itertools;
use rand::{
distributions::{Distribution, WeightedIndex},
Rng,
};
use std::{
collections::{BTreeMap, BTreeSet},
fs::File,
io::BufWriter,
net::SocketAddr,
time::{Duration, Instant},
};
use structopt::StructOpt;
use tokio::{
sync::mpsc::{self, Sender},
time, };
use tokio_util::time::delay_queue::DelayQueue;
use tracing_subscriber::EnvFilter;
use xor_name::{Prefix, XorName};
use yansi::{Color, Style};
use safe_network::node::{
create_test_max_capacity_and_root_storage, Config, Event as RoutingEvent, NodeApi,
NodeElderChange,
};
use sn_interface::messaging::{
data::Error::FailedToWriteFile, system::SystemMsg, DstLocation, MsgId,
};
use sn_interface::types::Cache;
const MIN_PRINT_DELAY: Duration = Duration::from_millis(500);
const PROBE_WINDOW: Duration = Duration::from_secs(60);
#[derive(Debug, StructOpt)]
struct Options {
#[structopt(short, long, name = "PATH")]
log: Option<String>,
#[structopt(short, long, default_value = "1")]
probe_frequency: f64,
schedule: Vec<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
let opts = Options::from_args();
if opts.schedule.is_empty() {
return Err(eyre!(
"Must specify churn schedule (run with --help for more info)."
));
}
if opts.probe_frequency <= 0.0 {
return Err(eyre!("Probe frequency must be greater than zero."));
}
let _log_guard = if let Some(path) = opts.log {
let builder = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env());
if path == "-" {
builder.init();
None
} else {
let file = File::create(path)?;
let file = BufWriter::new(file);
let (writer, guard) = tracing_appender::non_blocking(file);
builder.with_writer(writer).init();
Some(guard)
}
} else {
None
};
let schedule = ChurnSchedule::parse(&opts.schedule)?;
let (event_tx, mut event_rx) = mpsc::channel(20);
let mut network = Network::new();
network.create_node(event_tx.clone()).await;
let mut churn_events = schedule.events();
let probe_interval = Duration::from_secs_f64(1.0 / opts.probe_frequency);
let mut probes = time::interval(probe_interval);
loop {
tokio::select! {
event = event_rx.recv() => {
if let Some(event) = event {
network.handle_event(event).await?
} else {
break
}
}
event = churn_events.next() => {
match event {
Some(ChurnEvent::Join) => {
network.create_node(event_tx.clone()).await
}
Some(ChurnEvent::Drop) => {
network.remove_random_node()
}
None => unreachable!()
}
}
_ = probes.tick() => network.send_probes().await?,
}
}
Ok(())
}
#[allow(clippy::large_enum_variant)]
enum Event {
JoinSuccess { id: u64, node: NodeApi },
JoinFailure { id: u64, error: Error },
Routing { id: u64, event: RoutingEvent },
}
#[allow(clippy::large_enum_variant)]
enum Node {
Joining,
Joined {
node: NodeApi,
name: XorName,
age: u8,
prefix: Prefix,
is_relocating: bool,
elder: Option<ElderState>,
},
}
#[derive(Eq, PartialEq, Ord, PartialOrd)]
struct ElderState {
key: bls::PublicKey,
num_elders: usize,
}
struct Network {
nodes: BTreeMap<u64, Node>,
next_id: u64,
start_time: Instant,
print_time: Instant,
probe_tracker: ProbeTracker,
stats: Stats,
theme: Theme,
}
impl Network {
fn new() -> Self {
Self {
nodes: BTreeMap::new(),
next_id: 0,
start_time: Instant::now(),
print_time: Instant::now() - MIN_PRINT_DELAY,
probe_tracker: ProbeTracker::default(),
stats: Stats::default(),
theme: Theme::default(),
}
}
async fn create_node(&mut self, event_tx: Sender<Event>) {
let bootstrap_nodes = self.get_bootstrap_nodes().await;
let id = self.new_node_id();
let _prev = self.nodes.insert(id, Node::Joining);
self.stats.join_attempts += 1;
let config = Config {
first: bootstrap_nodes.is_empty(),
hard_coded_contacts: bootstrap_nodes,
..Default::default()
};
let _ = add_node(id, config, event_tx).await;
self.try_print_status();
}
fn remove_random_node(&mut self) {
let weighted_ids: Vec<_> = self
.nodes
.iter()
.filter_map(|(id, node)| match node {
Node::Joined { age, .. } => Some((*id, 1.0 / 2f64.powf(*age as f64))),
Node::Joining => None,
})
.collect();
let dist =
if let Ok(dist) = WeightedIndex::new(weighted_ids.iter().map(|(_, weight)| *weight)) {
dist
} else {
return;
};
let index = dist.sample(&mut rand::thread_rng());
let id = weighted_ids[index].0;
if let Some(node) = self.nodes.remove(&id) {
self.stats.drops += 1;
if let Node::Joined {
is_relocating: true,
..
} = node
{
self.stats.relocation_successes += 1;
}
self.try_print_status();
}
}
async fn handle_event(&mut self, event: Event) -> Result<()> {
match event {
Event::JoinSuccess { id, node } => {
let name = node.name().await;
let age = node.age().await;
let prefix = node.our_prefix().await;
let _prev = self.nodes.insert(
id,
Node::Joined {
node,
name,
age,
prefix,
is_relocating: false,
elder: None,
},
);
self.stats.join_successes += 1;
}
Event::JoinFailure { id, error } => {
println!("{}: {}", self.theme.error.paint("join failure"), error);
let _prev = self.nodes.remove(&id);
self.stats.join_failures += 1;
}
Event::Routing { id, event } => match event {
RoutingEvent::EldersChanged {
elders,
self_status_change,
..
}
| RoutingEvent::SectionSplit {
elders,
self_status_change,
..
} => {
if let Some(Node::Joined {
name,
prefix,
elder,
..
}) = self.nodes.get_mut(&id)
{
let old_prefix = *prefix;
*prefix = elders.prefix;
if elders.added.contains(name) || elders.remaining.contains(name) {
*elder = Some(ElderState {
key: elders.key,
num_elders: elders.remaining.len() + elders.added.len(),
});
} else if elders.removed.contains(name) && old_prefix == elders.prefix {
*elder = None;
}
}
match self_status_change {
NodeElderChange::Promoted => self.stats.promotions += 1,
NodeElderChange::Demoted => self.stats.demotions += 1,
NodeElderChange::None => (),
};
}
RoutingEvent::RelocationStarted { .. } => {
if let Some(Node::Joined { is_relocating, .. }) = self.nodes.get_mut(&id) {
*is_relocating = true;
self.stats.relocation_attempts += 1;
}
}
RoutingEvent::Relocated { .. } => {
if let Some(Node::Joined {
node,
name,
age,
is_relocating,
..
}) = self.nodes.get_mut(&id)
{
*name = node.name().await;
*age = node.age().await;
*is_relocating = false;
self.stats.relocation_successes += 1;
}
}
RoutingEvent::MessageReceived { dst, .. } => {
let (dst, dst_section_pk) = match dst {
DstLocation::Section { name, section_pk } => (name, section_pk),
DstLocation::Node { name, section_pk } => (name, section_pk),
DstLocation::EndUser(_) => {
return Err(eyre!("unexpected probe message dst: {:?}", dst))
}
};
self.probe_tracker.receive(&dst, &dst_section_pk).await;
}
_ => {
}
},
}
self.try_print_status();
Ok(())
}
fn new_node_id(&mut self) -> u64 {
let id = self.next_id;
self.next_id = self.next_id.checked_add(1).expect("too many nodes");
id
}
async fn get_bootstrap_nodes(&self) -> BTreeSet<SocketAddr> {
const COUNT: usize = 1;
let mut nodes = BTreeSet::new();
for (node, _) in self
.nodes
.values()
.filter_map(|node| match node {
Node::Joined { node, age, .. } => Some((node, age)),
Node::Joining => None,
})
.sorted_by(|(_, lhs_age), (_, rhs_age)| lhs_age.cmp(rhs_age).reverse())
.take(COUNT)
{
let _prev = nodes.insert(node.our_connection_info().await);
}
nodes
}
async fn send_probes(&mut self) -> Result<()> {
let mut cache = BTreeMap::new();
let nodes = self.nodes.values().filter_map(|node| match node {
Node::Joined { node, prefix, .. } => Some((node, prefix)),
Node::Joining => None,
});
for (node, prefix) in nodes {
let dst = *cache
.entry(prefix)
.or_insert_with(|| prefix.substituted_in(xor_name::rand::random()));
let nodes_section = node
.matching_section(&node.name().await)
.await
.context("No pk found for our node's section")?;
let nodes_section_pk = nodes_section.section_key();
if self.try_send_probe(node, dst).await? {
self.probe_tracker
.send(*prefix, dst, nodes_section_pk)
.await;
}
}
self.probe_tracker.prune();
self.try_print_status();
Ok(())
}
async fn try_send_probe(&self, node: &NodeApi, dst: XorName) -> Result<bool> {
let public_key_set = if let Ok(public_key_set) = node.public_key_set().await {
public_key_set
} else {
return Ok(false);
};
let node_msg = SystemMsg::NodeMsgError {
error: FailedToWriteFile,
correlation_id: MsgId::new(),
};
let dst_location = DstLocation::Section {
name: dst,
section_pk: public_key_set.public_key(),
};
let wire_msg = node.sign_single_src_msg(node_msg, dst_location).await?;
match node.send_msg_to_nodes(wire_msg).await {
Ok(()) => Ok(true),
Err(error) => {
Err(error).context(format!("failed to send probe by {}", node.name().await))
}
}
}
fn try_print_status(&mut self) {
if self.print_time.elapsed() > MIN_PRINT_DELAY {
self.print_time = Instant::now();
self.print_status();
}
}
fn print_status(&self) {
let mut num_active = 0;
let mut num_joining = 0;
let mut num_relocating = 0;
let mut num_elders = 0;
let mut ages = vec![0; 256];
let mut sections = BTreeMap::new();
for node in self.nodes.values() {
match node {
Node::Joining => {
num_joining += 1;
}
Node::Joined {
age,
prefix,
is_relocating,
elder,
..
} => {
if *is_relocating {
num_relocating += 1;
} else {
num_active += 1;
}
if elder.is_some() {
num_elders += 1;
}
ages[*age as usize] += 1;
*sections.entry(prefix).or_insert(0) += 1;
}
}
}
println!();
println!(
"{:18} {}",
self.theme.label.paint("duration:"),
self.theme.value.paint(format_args!(
"{:.1}s",
self.start_time.elapsed().as_secs_f64()
))
);
println!(
"{:18} total: {}, active: {}, joining: {}, relocating: {}",
self.theme.label.paint("nodes:"),
self.theme.value.paint(self.nodes.len()),
self.theme.value.paint(num_active),
self.theme.value.paint(num_joining),
self.theme.value.paint(num_relocating),
);
println!(
"{:18} elders: {}, {}",
self.theme.label.paint("age distribution:"),
self.theme.value.paint(num_elders),
ages.into_iter()
.enumerate()
.filter(|(_, count)| *count > 0)
.format_with(", ", |(age, count), f| f(&format_args!(
"{}: {}",
age,
self.theme.value.paint(count)
)))
);
println!(
"{:18} {}",
self.theme.label.paint("section members:"),
sections
.iter()
.format_with(", ", |(prefix, count), f| f(&format_args!(
"({:b}): {}",
prefix,
self.theme.value.paint(count)
)))
);
println!(
"{:18} {}",
self.theme.label.paint("section health:"),
self.probe_tracker
.status()
.filter(|(prefix, ..)| sections.contains_key(prefix))
.format_with(", ", |(prefix, delivered, sent), f| {
let percent = percent(delivered, sent);
f(&format_args!(
"({:b}): {}",
prefix,
self.theme
.health(percent)
.paint(format_args!("{:.0}%", percent)),
))
})
);
let failure_style = if self.stats.join_failures == 0 {
self.theme.value
} else {
self.theme.error
};
println!(
"{:18} join attempts: {} (successes: {}, failures: {}), drops: {}",
self.theme.label.paint("churn:"),
self.theme.value.paint(self.stats.join_attempts),
self.theme.value.paint(format_args!(
"{} ({:.0}%)",
self.stats.join_successes,
percent(self.stats.join_successes, self.stats.join_attempts)
)),
failure_style.paint(format_args!(
"{} ({:.0}%)",
self.stats.join_failures,
percent(self.stats.join_failures, self.stats.join_attempts)
)),
self.theme.value.paint(self.stats.drops)
);
println!(
"{:18} attempts: {}, successes: {}",
self.theme.label.paint("relocations:"),
self.theme.value.paint(self.stats.relocation_attempts),
self.theme.value.paint(format_args!(
"{} ({:.0}%)",
self.stats.relocation_successes,
percent(
self.stats.relocation_successes,
self.stats.relocation_attempts
)
))
);
println!(
"{:18} promotions: {}, demotions: {}",
self.theme.label.paint("elder churn:"),
self.theme.value.paint(self.stats.promotions),
self.theme.value.paint(self.stats.demotions)
);
println!();
}
}
async fn add_node(id: u64, mut config: Config, event_tx: Sender<Event>) -> Result<()> {
let (max_capacity, root_dir) = create_test_max_capacity_and_root_storage()?;
config.max_capacity = Some(max_capacity);
config.root_dir = Some(root_dir);
let joining_timeout = Duration::from_secs(3 * 60);
let (node, mut events) = match NodeApi::new(&config, joining_timeout).await {
Ok(output) => output,
Err(error) => {
let _res = event_tx
.send(Event::JoinFailure {
id,
error: error.into(),
})
.await;
return Ok(());
}
};
let _res = event_tx.send(Event::JoinSuccess { id, node }).await;
while let Some(event) = events.next().await {
if event_tx.send(Event::Routing { id, event }).await.is_err() {
break;
}
}
Ok(())
}
#[derive(Debug)]
struct ChurnSchedule(Vec<ChurnPeriod>);
impl ChurnSchedule {
fn parse(input: &[String]) -> Result<Self> {
let vec: Result<Vec<_>> = input
.chunks(3)
.map(|chunk| {
let duration: f64 = chunk.get(0).map(|n| n.parse()).unwrap_or(Ok(0.0))?;
let duration = Duration::from_secs_f64(duration);
let joins = chunk.get(1).map(|n| n.parse()).unwrap_or(Ok(0))?;
let drops = chunk.get(2).map(|n| n.parse()).unwrap_or(Ok(0))?;
Ok(ChurnPeriod {
duration,
joins,
drops,
})
})
.collect();
Ok(Self(vec?))
}
fn events(&self) -> impl Stream<Item = ChurnEvent> + Unpin {
let mut rng = rand::thread_rng();
let mut start = Duration::default();
let mut queue = DelayQueue::new();
for period in &self.0 {
let end = start + period.duration;
for _ in 0..period.joins {
queue.insert(
ChurnEvent::Join,
if start < end {
rng.gen_range(start, end)
} else {
start
},
);
}
for _ in 0..period.drops {
queue.insert(
ChurnEvent::Drop,
if start < end {
rng.gen_range(start, end)
} else {
start
},
);
}
start = end;
}
queue
.filter_map(|result| future::ready(result.ok()))
.map(|expired| expired.into_inner())
.chain(stream::pending())
}
}
#[derive(Debug)]
struct ChurnPeriod {
duration: Duration,
joins: usize,
drops: usize,
}
enum ChurnEvent {
Join,
Drop,
}
#[derive(Default)]
struct Stats {
join_attempts: usize,
join_failures: usize,
join_successes: usize,
drops: usize,
relocation_attempts: usize,
relocation_successes: usize,
promotions: usize,
demotions: usize,
}
#[derive(Clone)]
enum ProbeState {
Pending(PublicKey),
Success,
}
#[derive(Default)]
struct ProbeTracker {
sections: BTreeMap<Prefix, Cache<XorName, ProbeState>>,
}
use futures::executor::block_on as block;
impl ProbeTracker {
async fn send(&mut self, src: Prefix, dst: XorName, current_public_key: PublicKey) {
let cache = self
.sections
.entry(src)
.or_insert_with(|| Cache::with_expiry_duration(PROBE_WINDOW));
if cache.get(&dst).await.is_none() {
cache
.set(dst, ProbeState::Pending(current_public_key), None)
.await;
}
}
async fn receive(&mut self, dst: &XorName, received_public_key: &PublicKey) {
let result = &self
.sections
.iter()
.find_map(|(prefix, cache)| block(cache.get(dst)).map(|pk| (prefix, pk, cache)));
let (_prefix, probe_state, cache) = match result {
None => return,
Some(state) => state,
};
if let ProbeState::Pending(pk) = probe_state {
if received_public_key == pk {
cache.set(*dst, ProbeState::Success, None).await;
}
}
}
fn status(&self) -> impl Iterator<Item = (&Prefix, usize, usize)> {
self.sections.iter().map(|(prefix, section)| {
let success = block(section.count(|(_, state)| match state.object {
ProbeState::Success => true,
ProbeState::Pending(_) => false,
}));
(prefix, success, block(section.len()))
})
}
fn prune(&mut self) {
let remove: Vec<_> = self
.sections
.iter()
.filter(|(_, section)| block(section.is_empty()))
.map(|(prefix, _)| *prefix)
.collect();
for prefix in remove {
let _prev = self.sections.remove(&prefix);
}
}
}
struct Theme {
label: Style,
value: Style,
error: Style,
}
impl Theme {
fn health(&self, health: f64) -> Style {
if health > 80.0 {
Style::new(Color::Green).bold()
} else if health > 50.0 {
Style::new(Color::Yellow).bold()
} else {
Style::new(Color::Red).bold()
}
}
}
impl Default for Theme {
fn default() -> Self {
Self {
label: Style::new(Color::Magenta).bold(),
value: Style::new(Color::Blue).bold(),
error: Style::new(Color::Red).bold(),
}
}
}
fn percent(num: usize, den: usize) -> f64 {
if den > 0 {
100.0 * num as f64 / den as f64
} else {
0.0
}
}