use eyre::Result;
use futures::future::join_all;
use safe_network::node::routing::{
create_test_max_capacity_and_root_storage, Config, Event, EventStream, Routing,
};
use safe_network::UsedSpace;
use std::{
convert::TryInto,
iter,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use structopt::StructOpt;
use tokio::task::JoinHandle;
use tracing::info;
use tracing_subscriber::EnvFilter;
#[derive(Debug, StructOpt)]
struct Options {
#[structopt(
short,
long,
name = "bootstrap-contact",
value_name = "SOCKET_ADDRESS",
required_unless = "first"
)]
bootstrap_contacts: Vec<SocketAddr>,
#[structopt(short, long, conflicts_with = "bootstrap-contact")]
first: bool,
#[structopt(short, long, value_name = "IP")]
ip: Option<IpAddr>,
#[structopt(short, long, value_name = "PORT")]
port: Option<u16>,
#[structopt(
short,
long,
default_value,
value_name = "COUNT",
hide_default_value = true
)]
count: usize,
#[structopt(short, parse(from_occurrences))]
verbosity: u8,
}
#[tokio::main]
async fn main() -> Result<()> {
let opts = Options::from_args();
init_log(opts.verbosity);
let handles: Vec<_> = if opts.count <= 1 {
let handle =
start_single_node(opts.first, opts.bootstrap_contacts, opts.ip, opts.port).await?;
iter::once(handle).collect()
} else {
start_multiple_nodes(
opts.count,
opts.first,
opts.bootstrap_contacts,
opts.ip,
opts.port,
)
.await?
};
let _res = join_all(handles).await;
Ok(())
}
async fn start_single_node(
first: bool,
contacts: Vec<SocketAddr>,
ip: Option<IpAddr>,
port: Option<u16>,
) -> Result<JoinHandle<()>> {
let (_contact, handle) = start_node(0, first, contacts, ip, port).await?;
Ok(handle)
}
async fn start_multiple_nodes(
count: usize,
first: bool,
mut contacts: Vec<SocketAddr>,
ip: Option<IpAddr>,
base_port: Option<u16>,
) -> Result<Vec<JoinHandle<()>>> {
let mut handles = Vec::new();
let first_index = if first {
let (first_contact, first_handle) = start_node(0, true, vec![], ip, base_port).await?;
contacts.push(first_contact);
handles.push(first_handle);
1
} else {
0
};
for index in first_index..count {
let (_contact_info, handle) =
start_node(index, false, contacts.clone(), ip, base_port).await?;
handles.push(handle);
}
Ok(handles)
}
async fn start_node(
index: usize,
first: bool,
bootstrap_nodes: Vec<SocketAddr>,
ip: Option<IpAddr>,
base_port: Option<u16>,
) -> Result<(SocketAddr, JoinHandle<()>)> {
let ip = ip.unwrap_or_else(|| Ipv4Addr::LOCALHOST.into());
let local_port = base_port
.map(|base_port| {
index
.try_into()
.ok()
.and_then(|offset| base_port.checked_add(offset))
.expect("port out of range")
})
.unwrap_or_else(rand::random);
info!("Node #{} starting...", index);
let config = Config {
first,
local_addr: SocketAddr::new(ip, local_port),
bootstrap_nodes: bootstrap_nodes.into_iter().collect(),
..Default::default()
};
let (max_capacity, root) = create_test_max_capacity_and_root_storage()?;
let (node, event_stream) = Routing::new(config, UsedSpace::new(max_capacity), root)
.await
.expect("Failed to instantiate a Node");
let contact_info = node.our_connection_info().await;
info!(
"Node #{} connected - name: {}, contact: {}",
index,
node.name().await,
contact_info
);
let handle = run_node(index, node, event_stream);
Ok((contact_info, handle))
}
fn run_node(index: usize, mut node: Routing, mut event_stream: EventStream) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(event) = event_stream.next().await {
if !handle_event(index, &mut node, event).await {
break;
}
}
})
}
async fn handle_event(index: usize, node: &mut Routing, event: Event) -> bool {
match event {
Event::MemberJoined {
name,
previous_name,
age,
} => {
info!(
"Node #{} member joined - name: {}, previous_name: {:?}, age: {}",
index, name, previous_name, age
);
}
Event::MemberLeft { name, age } => {
info!("Node #{} member left - name: {}, age: {}", index, name, age);
}
Event::SectionSplit {
elders,
self_status_change,
} => {
info!(
"Node #{} section split - elders: {:?}, node elder status change: {:?}",
index, elders, self_status_change
);
}
Event::EldersChanged {
elders,
self_status_change,
} => {
info!(
"Node #{} elders changed - elders: {:?}, node elder status change: {:?}",
index, elders, self_status_change
);
}
Event::MessageReceived { msg, src, dst, .. } => info!(
"Node #{} received message - src: {:?}, dst: {:?}, content: {:?}",
index, src, dst, msg
),
Event::RelocationStarted { previous_name } => info!(
"Node #{} relocation started - previous_name: {}",
index, previous_name
),
Event::Relocated { previous_name, .. } => {
let new_name = node.name().await;
info!(
"Node #{} relocated - old name: {}, new name: {}",
index, previous_name, new_name,
);
}
Event::ServiceMsgReceived { msg, user, .. } => info!(
"Node #{} received message from user: {:?}, msg: {:?}",
index, user, msg
),
Event::AdultsChanged {
remaining,
added,
removed,
} => info!(
"Node #{} adults changed - remaining: {:?}, added: {:?}, removed: {:?}",
index, remaining, added, removed
),
}
true
}
fn init_log(verbosity: u8) {
const BIN_NAME: &str = module_path!();
const CRATE_NAME: &str = "safe_network";
let filter = match verbosity {
0 => EnvFilter::new(format!("{}=info,{}=warn", BIN_NAME, CRATE_NAME)),
1 => EnvFilter::new(format!("{},{}=info", BIN_NAME, CRATE_NAME)),
2 => EnvFilter::new(format!("{},{}=debug", BIN_NAME, CRATE_NAME)),
3 => EnvFilter::new(format!("{},{}=trace", BIN_NAME, CRATE_NAME)),
_ => EnvFilter::new("trace"),
};
tracing_subscriber::fmt().with_env_filter(filter).init()
}