pub(crate) mod command;
mod approved;
mod bootstrap;
mod comm;
mod event_stream;
mod split_barrier;
mod stage;
#[cfg(test)]
mod tests;
pub use self::event_stream::EventStream;
use self::{
approved::Approved,
comm::{Comm, ConnectionEvent},
command::Command,
split_barrier::SplitBarrier,
stage::Stage,
};
use crate::{
crypto,
error::Result,
event::{Event, NodeElderChange},
location::{DstLocation, SrcLocation},
messages::Message,
node::Node,
peer::Peer,
section::{EldersInfo, SectionProofChain},
TransportConfig, MIN_AGE,
};
use bytes::Bytes;
use ed25519_dalek::{Keypair, PublicKey, Signature, Signer};
use itertools::Itertools;
use sn_messaging::{
client::MsgEnvelope,
infrastructure::{ErrorResponse, Message as InfrastructureMessage},
node::NodeMessage,
MessageType, WireMsg,
};
use std::{net::SocketAddr, sync::Arc};
use tokio::{sync::mpsc, task};
use xor_name::{Prefix, XorName};
#[derive(Debug)]
pub struct Config {
pub first: bool,
pub keypair: Option<Keypair>,
pub transport_config: TransportConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
first: false,
keypair: None,
transport_config: TransportConfig::default(),
}
}
}
pub struct Routing {
stage: Arc<Stage>,
}
impl Routing {
pub async fn new(config: Config) -> Result<(Self, EventStream)> {
let keypair = config.keypair.unwrap_or_else(crypto::gen_keypair);
let node_name = crypto::name(&keypair.public);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (connection_event_tx, mut connection_event_rx) = mpsc::channel(1);
let (state, comm, backlog) = if config.first {
info!("{} Starting a new network as the seed node.", node_name);
let comm = Comm::new(config.transport_config, connection_event_tx).await?;
let node = Node::new(keypair, comm.our_connection_info()).with_age(MIN_AGE + 1);
let state = Approved::first_node(node, event_tx)?;
let section = state.section();
state.send_event(Event::EldersChanged {
prefix: *section.prefix(),
key: *section.chain().last_key(),
elders: section.elders_info().elders.keys().copied().collect(),
self_status_change: NodeElderChange::Promoted,
});
(state, comm, vec![])
} else {
info!("{} Bootstrapping a new node.", node_name);
let (comm, bootstrap_addr) =
Comm::bootstrap(config.transport_config, connection_event_tx).await?;
let node = Node::new(keypair, comm.our_connection_info()).with_age(MIN_AGE + 1);
let (node, section, backlog) =
bootstrap::initial(node, &comm, &mut connection_event_rx, bootstrap_addr).await?;
let state = Approved::new(node, section, None, event_tx);
(state, comm, backlog)
};
let stage = Arc::new(Stage::new(state, comm));
let event_stream = EventStream::new(event_rx);
for (message, sender) in backlog {
stage
.clone()
.handle_commands(Command::HandleMessage {
message,
sender: Some(sender),
})
.await?;
}
let _ = task::spawn(handle_connection_events(stage.clone(), connection_event_rx));
let routing = Self { stage };
Ok((routing, event_stream))
}
pub async fn set_joins_allowed(&self, joins_allowed: bool) -> Result<()> {
let command = Command::SetJoinsAllowed(joins_allowed);
self.stage.clone().handle_commands(command).await
}
pub async fn age(&self) -> u8 {
self.stage.state.lock().await.node().age
}
pub async fn public_key(&self) -> PublicKey {
self.stage.state.lock().await.node().keypair.public
}
pub async fn sign_as_node(&self, data: &[u8]) -> Signature {
self.stage.state.lock().await.node().keypair.sign(data)
}
pub async fn sign_as_elder(
&self,
data: &[u8],
public_key: &bls::PublicKey,
) -> Result<bls::SignatureShare> {
self.stage
.state
.lock()
.await
.sign_with_section_key_share(data, public_key)
}
pub async fn verify(&self, data: &[u8], signature: &Signature) -> bool {
self.stage
.state
.lock()
.await
.node()
.keypair
.verify(data, signature)
.is_ok()
}
pub async fn name(&self) -> XorName {
self.stage.state.lock().await.node().name()
}
pub fn our_connection_info(&self) -> SocketAddr {
self.stage.comm.our_connection_info()
}
pub async fn our_prefix(&self) -> Prefix {
*self.stage.state.lock().await.section().prefix()
}
pub async fn matches_our_prefix(&self, name: &XorName) -> bool {
self.our_prefix().await.matches(name)
}
pub async fn is_elder(&self) -> bool {
self.stage.state.lock().await.is_elder()
}
pub async fn our_elders(&self) -> Vec<Peer> {
self.stage
.state
.lock()
.await
.section()
.elders_info()
.peers()
.copied()
.collect()
}
pub async fn our_elders_sorted_by_distance_to(&self, name: &XorName) -> Vec<Peer> {
self.our_elders()
.await
.into_iter()
.sorted_by(|lhs, rhs| name.cmp_distance(lhs.name(), rhs.name()))
.collect()
}
pub async fn our_adults(&self) -> Vec<Peer> {
self.stage
.state
.lock()
.await
.section()
.adults()
.copied()
.collect()
}
pub async fn our_adults_sorted_by_distance_to(&self, name: &XorName) -> Vec<Peer> {
self.our_adults()
.await
.into_iter()
.sorted_by(|lhs, rhs| name.cmp_distance(lhs.name(), rhs.name()))
.collect()
}
pub async fn our_section(&self) -> EldersInfo {
self.stage
.state
.lock()
.await
.section()
.elders_info()
.clone()
}
pub async fn neighbour_sections(&self) -> Vec<EldersInfo> {
self.stage
.state
.lock()
.await
.network()
.all()
.cloned()
.collect()
}
pub async fn match_section(
&self,
name: &XorName,
) -> (Option<bls::PublicKey>, Option<EldersInfo>) {
let state = self.stage.state.lock().await;
if state.section().prefix().matches(name) {
let section = state.section();
(
Some(*section.chain().last_key()),
Some(section.elders_info().clone()),
)
} else {
state.network().section_by_name(name)
}
}
pub async fn send_message(
&self,
src: SrcLocation,
dst: DstLocation,
content: Bytes,
) -> Result<()> {
let command = Command::SendUserMessage { src, dst, content };
self.stage.clone().handle_commands(command).await
}
pub async fn send_message_to_client(
&self,
recipient: SocketAddr,
message: MsgEnvelope,
) -> Result<()> {
let command = Command::SendMessage {
recipients: vec![recipient],
delivery_group_size: 1,
message: MessageType::ClientMessage(message),
};
self.stage.clone().handle_commands(command).await
}
pub async fn public_key_set(&self) -> Result<bls::PublicKeySet> {
self.stage.state.lock().await.public_key_set()
}
pub async fn our_history(&self) -> SectionProofChain {
self.stage.state.lock().await.section().chain().clone()
}
pub async fn our_index(&self) -> Result<usize> {
self.stage.state.lock().await.our_index()
}
}
impl Drop for Routing {
fn drop(&mut self) {
self.stage.terminate()
}
}
async fn handle_connection_events(
stage: Arc<Stage>,
mut incoming_conns: mpsc::Receiver<ConnectionEvent>,
) {
while let Some(event) = incoming_conns.recv().await {
match event {
ConnectionEvent::Received((src, bytes)) => {
trace!("New message ({} bytes) received from: {}", bytes.len(), src);
handle_message(stage.clone(), bytes, src).await;
}
ConnectionEvent::Disconnected(addr) => {
trace!("Lost connection to {:?}", addr);
let _ = stage
.clone()
.handle_commands(Command::HandleConnectionLost(addr))
.await;
}
}
}
}
async fn handle_message(stage: Arc<Stage>, bytes: Bytes, sender: SocketAddr) {
let message_type = match WireMsg::deserialize(bytes) {
Ok(message_type) => message_type,
Err(error) => {
error!("Failed to deserialize message from {}: {}", sender, error);
return;
}
};
match message_type {
MessageType::Ping => {
}
MessageType::InfrastructureMessage(message) => {
let command = Command::HandleInfrastructureMessage { sender, message };
let _ = task::spawn(stage.handle_commands(command));
}
MessageType::NodeMessage(NodeMessage(msg_bytes)) => {
match Message::from_bytes(Bytes::from(msg_bytes)) {
Ok(message) => {
let command = Command::HandleMessage {
message,
sender: Some(sender),
};
let _ = task::spawn(stage.handle_commands(command));
}
Err(error) => {
error!(
"Error occurred when deserialising node message bytes from {}: {}",
sender, error
);
}
}
}
MessageType::ClientMessage(msg_envelope) => {
if let Some(client_pk) = msg_envelope.message.target_section_pk() {
if let Some(bls_pk) = client_pk.bls() {
if let Err(error) = stage.check_key_status(&bls_pk).await {
let incoming_msg = msg_envelope.message;
let correlation_id = incoming_msg.id();
let command = Command::SendMessage {
recipients: vec![sender],
delivery_group_size: 1,
message: MessageType::InfrastructureMessage(
InfrastructureMessage::InfrastructureUpdate(ErrorResponse {
correlation_id,
error,
}),
),
};
let _ = task::spawn(stage.handle_commands(command));
return;
}
}
}
let event = Event::ClientMessageReceived {
content: Box::new(msg_envelope),
src: sender,
};
stage.send_event(event).await;
}
}
}