pub(crate) mod command;
mod bootstrap;
mod comm;
mod connectivity_complaints;
mod core;
mod dispatcher;
mod enduser_registry;
mod event_stream;
mod lazy_messaging;
mod split_barrier;
#[cfg(test)]
mod tests;
pub use self::event_stream::EventStream;
use self::{
comm::{Comm, ConnectionEvent},
command::Command,
core::Core,
dispatcher::Dispatcher,
};
use crate::{
crypto,
error::Result,
event::{Event, NodeElderChange},
messages::Message,
node::Node,
peer::Peer,
section::{EldersInfo, SectionChain},
Error, TransportConfig, MIN_ADULT_AGE,
};
use bytes::Bytes;
use ed25519_dalek::{Keypair, PublicKey, Signature, Signer, KEYPAIR_LENGTH};
use itertools::Itertools;
use sn_messaging::{
client::Message as ClientMessage,
node::NodeMessage,
section_info::{Error as TargetSectionError, Message as SectionInfoMsg},
DstLocation, EndUser, Itinerary, MessageType, WireMsg,
};
use std::{net::SocketAddr, sync::Arc};
use tokio::{sync::mpsc, task};
use xor_name::{Prefix, XorName};
#[derive(Debug)]
pub struct Config {
pub first: bool,
pub keypair: Option<Keypair>,
pub transport_config: TransportConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
first: false,
keypair: None,
transport_config: TransportConfig::default(),
}
}
}
pub struct Routing {
dispatcher: Arc<Dispatcher>,
}
impl Routing {
pub async fn new(config: Config) -> Result<(Self, EventStream)> {
let keypair = config.keypair.unwrap_or_else(|| {
crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE)
});
let node_name = crypto::name(&keypair.public);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (connection_event_tx, mut connection_event_rx) = mpsc::channel(1);
let (state, comm, backlog) = if config.first {
let keypair = crypto::gen_keypair(&Prefix::default().range_inclusive(), 255);
let node_name = crypto::name(&keypair.public);
info!("{} Starting a new network as the genesis node.", node_name);
let comm = Comm::new(config.transport_config, connection_event_tx).await?;
let node = Node::new(keypair, comm.our_connection_info());
let state = Core::first_node(node, event_tx)?;
let section = state.section();
state.send_event(Event::EldersChanged {
prefix: *section.prefix(),
key: *section.chain().last_key(),
sibling_key: None,
elders: section.elders_info().elders.keys().copied().collect(),
self_status_change: NodeElderChange::Promoted,
});
(state, comm, vec![])
} else {
info!("{} Bootstrapping a new node.", node_name);
let (comm, bootstrap_addr) =
Comm::bootstrap(config.transport_config, connection_event_tx).await?;
let node = Node::new(keypair, comm.our_connection_info());
let (node, section, backlog) =
bootstrap::initial(node, &comm, &mut connection_event_rx, bootstrap_addr).await?;
let state = Core::new(node, section, None, event_tx);
(state, comm, backlog)
};
let dispatcher = Arc::new(Dispatcher::new(state, comm));
let event_stream = EventStream::new(event_rx);
for (message, sender) in backlog {
dispatcher
.clone()
.handle_commands(Command::HandleMessage {
message,
sender: Some(sender),
})
.await?;
}
let _ = task::spawn(handle_connection_events(
dispatcher.clone(),
connection_event_rx,
));
let routing = Self { dispatcher };
Ok((routing, event_stream))
}
pub async fn set_joins_allowed(&self, joins_allowed: bool) -> Result<()> {
let command = Command::SetJoinsAllowed(joins_allowed);
self.dispatcher.clone().handle_commands(command).await
}
pub async fn propose_offline(&self, name: XorName) -> Result<()> {
if !self.is_elder().await {
return Err(Error::InvalidState);
}
let command = Command::ProposeOffline(name);
self.dispatcher.clone().handle_commands(command).await
}
pub async fn age(&self) -> u8 {
self.dispatcher.core.lock().await.node().age()
}
pub async fn public_key(&self) -> PublicKey {
self.dispatcher.core.lock().await.node().keypair.public
}
pub async fn keypair_as_bytes(&self) -> [u8; KEYPAIR_LENGTH] {
self.dispatcher.core.lock().await.node().keypair.to_bytes()
}
pub async fn sign_as_node(&self, data: &[u8]) -> Signature {
self.dispatcher.core.lock().await.node().keypair.sign(data)
}
pub async fn sign_as_elder(
&self,
data: &[u8],
public_key: &bls::PublicKey,
) -> Result<bls::SignatureShare> {
self.dispatcher
.core
.lock()
.await
.sign_with_section_key_share(data, public_key)
}
pub async fn verify(&self, data: &[u8], signature: &Signature) -> bool {
self.dispatcher
.core
.lock()
.await
.node()
.keypair
.verify(data, signature)
.is_ok()
}
pub async fn name(&self) -> XorName {
self.dispatcher.core.lock().await.node().name()
}
pub fn our_connection_info(&self) -> SocketAddr {
self.dispatcher.comm.our_connection_info()
}
pub async fn section_chain(&self) -> SectionChain {
self.dispatcher.core.lock().await.section_chain().clone()
}
pub async fn our_prefix(&self) -> Prefix {
*self.dispatcher.core.lock().await.section().prefix()
}
pub async fn matches_our_prefix(&self, name: &XorName) -> bool {
self.our_prefix().await.matches(name)
}
pub async fn is_elder(&self) -> bool {
self.dispatcher.core.lock().await.is_elder()
}
pub async fn our_elders(&self) -> Vec<Peer> {
self.dispatcher
.core
.lock()
.await
.section()
.elders_info()
.peers()
.copied()
.collect()
}
pub async fn our_elders_sorted_by_distance_to(&self, name: &XorName) -> Vec<Peer> {
self.our_elders()
.await
.into_iter()
.sorted_by(|lhs, rhs| name.cmp_distance(lhs.name(), rhs.name()))
.collect()
}
pub async fn our_adults(&self) -> Vec<Peer> {
self.dispatcher
.core
.lock()
.await
.section()
.adults()
.copied()
.collect()
}
pub async fn our_adults_sorted_by_distance_to(&self, name: &XorName) -> Vec<Peer> {
self.our_adults()
.await
.into_iter()
.sorted_by(|lhs, rhs| name.cmp_distance(lhs.name(), rhs.name()))
.collect()
}
pub async fn our_section(&self) -> EldersInfo {
self.dispatcher
.core
.lock()
.await
.section()
.elders_info()
.clone()
}
pub async fn other_sections(&self) -> Vec<EldersInfo> {
self.dispatcher
.core
.lock()
.await
.network()
.all()
.cloned()
.collect()
}
pub async fn section_key(&self, prefix: &Prefix) -> Option<bls::PublicKey> {
self.dispatcher
.core
.lock()
.await
.section_key(prefix)
.copied()
}
pub async fn matching_section(
&self,
name: &XorName,
) -> (Option<bls::PublicKey>, Option<EldersInfo>) {
let state = self.dispatcher.core.lock().await;
let (key, elders_info) = state.matching_section(name);
(key.copied(), elders_info.cloned())
}
pub async fn send_message(
&self,
itinerary: Itinerary,
content: Bytes,
additional_proof_chain_key: Option<bls::PublicKey>,
) -> Result<()> {
if let DstLocation::EndUser(EndUser::Client {
socket_id,
public_key,
}) = itinerary.dst
{
let name = XorName::from(public_key);
if self.our_prefix().await.matches(&name) {
let socket_addr = self
.dispatcher
.core
.lock()
.await
.get_socket_addr(socket_id)
.copied();
if let Some(socket_addr) = socket_addr {
debug!(
"Sending client msg of {:?} to {:?}",
public_key, socket_addr
);
return self
.send_message_to_client(socket_addr, ClientMessage::from(content)?)
.await;
} else {
debug!(
"Could not find socketaddr corresponding to socket_id {:?} and public_key {:?}",
socket_id, public_key
);
debug!("Sending user message instead.. (Command::SendUserMessage)");
}
} else {
debug!("Relaying message with sending user message (Command::SendUserMessage)");
}
}
let command = Command::SendUserMessage {
itinerary,
content,
additional_proof_chain_key,
};
self.dispatcher.clone().handle_commands(command).await
}
async fn send_message_to_client(
&self,
recipient: SocketAddr,
message: ClientMessage,
) -> Result<()> {
let command = Command::SendMessage {
recipients: vec![recipient],
delivery_group_size: 1,
message: MessageType::ClientMessage(message),
};
self.dispatcher.clone().handle_commands(command).await
}
pub async fn public_key_set(&self) -> Result<bls::PublicKeySet> {
self.dispatcher.core.lock().await.public_key_set()
}
pub async fn our_history(&self) -> SectionChain {
self.dispatcher.core.lock().await.section().chain().clone()
}
pub async fn our_index(&self) -> Result<usize> {
self.dispatcher.core.lock().await.our_index()
}
}
impl Drop for Routing {
fn drop(&mut self) {
self.dispatcher.terminate()
}
}
async fn handle_connection_events(
dispatcher: Arc<Dispatcher>,
mut incoming_conns: mpsc::Receiver<ConnectionEvent>,
) {
while let Some(event) = incoming_conns.recv().await {
match event {
ConnectionEvent::Received((src, bytes)) => {
trace!("New message ({} bytes) received from: {}", bytes.len(), src);
handle_message(dispatcher.clone(), bytes, src).await;
}
ConnectionEvent::Disconnected(addr) => {
trace!("Lost connection to {:?}", addr);
let _ = dispatcher
.clone()
.handle_commands(Command::HandleConnectionLost(addr))
.await;
}
}
}
}
async fn handle_message(dispatcher: Arc<Dispatcher>, bytes: Bytes, sender: SocketAddr) {
let span = {
let state = dispatcher.core.lock().await;
trace_span!("handle_message", name = %state.node().name(), %sender)
};
let _span_guard = span.enter();
let message_type = match WireMsg::deserialize(bytes) {
Ok(message_type) => message_type,
Err(error) => {
error!("Failed to deserialize message: {}", error);
return;
}
};
match message_type {
MessageType::Ping => {
}
MessageType::SectionInfo(message) => {
let command = Command::HandleSectionInfoMsg { sender, message };
let _ = task::spawn(dispatcher.handle_commands(command));
}
MessageType::NodeMessage(NodeMessage(msg_bytes)) => {
match Message::from_bytes(Bytes::from(msg_bytes)) {
Ok(message) => {
let command = Command::HandleMessage {
message,
sender: Some(sender),
};
let _ = task::spawn(dispatcher.handle_commands(command));
}
Err(error) => {
error!("Failed to deserialize node message: {}", error);
}
}
}
MessageType::ClientMessage(message) => {
let end_user = dispatcher
.core
.lock()
.await
.get_enduser_by_addr(&sender)
.copied();
let end_user = match end_user {
Some(end_user) => end_user,
None => {
let command = Command::SendMessage {
recipients: vec![sender],
delivery_group_size: 1,
message: MessageType::SectionInfo(SectionInfoMsg::RegisterEndUserError(
TargetSectionError::InvalidBootstrap(format!(
"No enduser found for {} and msg {:?}",
sender, message
)),
)),
};
let _ = task::spawn(dispatcher.handle_commands(command));
return;
}
};
let event = Event::ClientMessageReceived {
msg: Box::new(message),
user: end_user,
};
dispatcher.send_event(event).await;
}
}
}