use futures::future::join_all;
use hex_fmt::HexFmt;
use sn_routing::{Config, Event, EventStream, Routing, TransportConfig};
use std::{
collections::HashSet,
convert::TryInto,
iter,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use structopt::StructOpt;
use tokio::task::JoinHandle;
use tracing::info;
use tracing_subscriber::EnvFilter;
#[derive(Debug, StructOpt)]
struct Options {
#[structopt(
short,
long,
name = "bootstrap-contact",
value_name = "SOCKET_ADDRESS",
required_unless = "first"
)]
bootstrap_contacts: Vec<SocketAddr>,
#[structopt(short, long, conflicts_with = "bootstrap-contact")]
first: bool,
#[structopt(short, long, value_name = "IP")]
ip: Option<IpAddr>,
#[structopt(short, long, value_name = "PORT")]
port: Option<u16>,
#[structopt(
short,
long,
default_value,
value_name = "COUNT",
hide_default_value = true
)]
count: usize,
#[structopt(short, parse(from_occurrences))]
verbosity: u8,
}
#[tokio::main]
async fn main() {
let opts = Options::from_args();
init_log(opts.verbosity);
let handles: Vec<_> = if opts.count <= 1 {
let handle =
start_single_node(opts.first, opts.bootstrap_contacts, opts.ip, opts.port).await;
iter::once(handle).collect()
} else {
start_multiple_nodes(
opts.count,
opts.first,
opts.bootstrap_contacts,
opts.ip,
opts.port,
)
.await
};
let _ = join_all(handles).await;
}
async fn start_single_node(
first: bool,
contacts: Vec<SocketAddr>,
ip: Option<IpAddr>,
port: Option<u16>,
) -> JoinHandle<()> {
let (_contact, handle) = start_node(0, first, contacts.into_iter().collect(), ip, port).await;
handle
}
async fn start_multiple_nodes(
count: usize,
first: bool,
mut contacts: Vec<SocketAddr>,
ip: Option<IpAddr>,
base_port: Option<u16>,
) -> Vec<JoinHandle<()>> {
let mut handles = Vec::new();
let first_index = if first {
let (first_contact, first_handle) =
start_node(0, true, Vec::default(), ip, base_port).await;
contacts.push(first_contact);
handles.push(first_handle);
1
} else {
0
};
for index in first_index..count {
let (_contact_info, handle) =
start_node(index, false, contacts.clone(), ip, base_port).await;
handles.push(handle);
}
handles
}
async fn start_node(
index: usize,
first: bool,
contacts: Vec<SocketAddr>,
ip: Option<IpAddr>,
base_port: Option<u16>,
) -> (SocketAddr, JoinHandle<()>) {
let ip = ip.unwrap_or_else(|| Ipv4Addr::LOCALHOST.into());
let local_port = base_port.map(|base_port| {
index
.try_into()
.ok()
.and_then(|offset| base_port.checked_add(offset))
.expect("port out of range")
});
let contacts: HashSet<_> = contacts.into_iter().collect();
let transport_config = TransportConfig {
hard_coded_contacts: contacts,
local_ip: Some(ip),
local_port,
..Default::default()
};
info!("Node #{} starting...", index);
let config = Config {
first,
transport_config,
..Default::default()
};
let (node, event_stream) = Routing::new(config)
.await
.expect("Failed to instantiate a Node");
let contact_info = node.our_connection_info();
info!(
"Node #{} connected - name: {}, contact: {}",
index,
node.name().await,
contact_info
);
let handle = run_node(index, node, event_stream);
(contact_info, handle)
}
fn run_node(index: usize, mut node: Routing, mut event_stream: EventStream) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(event) = event_stream.next().await {
if !handle_event(index, &mut node, event).await {
break;
}
}
})
}
async fn handle_event(index: usize, node: &mut Routing, event: Event) -> bool {
match event {
Event::MemberJoined {
name,
previous_name,
age,
} => {
info!(
"Node #{} member joined - name: {}, previous_name: {:?}, age: {}",
index, name, previous_name, age
);
}
Event::MemberLeft { name, age } => {
info!("Node #{} member left - name: {}, age: {}", index, name, age);
}
Event::SectionSplit {
elders,
sibling_elders,
self_status_change,
} => {
info!(
"Node #{} section split - elders: {:?}, sibling elders: {:?}, node elder status change: {:?}",
index, elders, sibling_elders, self_status_change
);
}
Event::EldersChanged {
elders,
self_status_change,
} => {
info!(
"Node #{} elders changed - elders: {:?}, node elder status change: {:?}",
index, elders, self_status_change
);
}
Event::MessageReceived {
content, src, dst, ..
} => info!(
"Node #{} received message - src: {:?}, dst: {:?}, content: {}",
index,
src,
dst,
HexFmt(&content)
),
Event::RelocationStarted { previous_name } => info!(
"Node #{} relocation started - previous_name: {}",
index, previous_name
),
Event::Relocated { previous_name, .. } => {
let new_name = node.name().await;
info!(
"Node #{} relocated - old name: {}, new name: {}",
index, previous_name, new_name,
);
}
Event::RestartRequired => {
info!("Node #{} requires restart", index);
return false;
}
Event::ClientMsgReceived { msg, user, .. } => info!(
"Node #{} received message from user: {:?}, msg: {:?}",
index, user, msg
),
Event::ClientLost(addr) => info!("Node #{} received ClientLost({:?})", index, addr),
Event::AdultsChanged {
remaining,
added,
removed,
} => info!(
"Node #{} adults changed - remaining: {:?}, added: {:?}, removed: {:?}",
index, remaining, added, removed
),
}
true
}
fn init_log(verbosity: u8) {
let filter = match verbosity {
0 => EnvFilter::new("minimal=info,sn_routing=warn"),
1 => EnvFilter::new("minimal,sn_routing=info"),
2 => EnvFilter::new("minimal,sn_routing=debug"),
3 => EnvFilter::new("minimal,sn_routing=trace"),
_ => EnvFilter::new("trace"),
};
tracing_subscriber::fmt().with_env_filter(filter).init()
}