use super::{send_message, verify_message};
use crate::{
ed25519,
error::{Error, Result},
messages::RoutingMsgUtils,
node::Node,
peer::PeerUtils,
routing::comm::{Comm, ConnectionEvent},
section::{SectionAuthorityProviderUtils, SectionUtils},
FIRST_SECTION_MAX_AGE, FIRST_SECTION_MIN_AGE, MIN_ADULT_AGE,
};
use futures::future;
use rand::seq::IteratorRandom;
use resource_proof::ResourceProof;
use sn_messaging::{
node::{
JoinRejectionReason, JoinRequest, JoinResponse, ResourceProofResponse, RoutingMsg, Section,
Variant,
},
DestInfo, DstLocation, MessageType, WireMsg,
};
use std::{
collections::{HashSet, VecDeque},
net::SocketAddr,
};
use tokio::sync::mpsc;
use tracing::Instrument;
use xor_name::{Prefix, XorName};
const BACKLOG_CAPACITY: usize = 100;
pub(crate) async fn join(
node: Node,
comm: &Comm,
incoming_conns: &mut mpsc::Receiver<ConnectionEvent>,
bootstrap_addr: SocketAddr,
) -> Result<(Node, Section, Vec<(RoutingMsg, SocketAddr, DestInfo)>)> {
let (send_tx, send_rx) = mpsc::channel(1);
let span = trace_span!("bootstrap", name = %node.name());
let state = Join::new(node, send_tx, incoming_conns);
future::join(state.run(bootstrap_addr), send_messages(send_rx, comm))
.instrument(span)
.await
.0
}
struct Join<'a> {
send_tx: mpsc::Sender<(MessageType, Vec<(XorName, SocketAddr)>)>,
recv_rx: &'a mut mpsc::Receiver<ConnectionEvent>,
node: Node,
backlog: VecDeque<(RoutingMsg, SocketAddr, DestInfo)>,
}
impl<'a> Join<'a> {
fn new(
node: Node,
send_tx: mpsc::Sender<(MessageType, Vec<(XorName, SocketAddr)>)>,
recv_rx: &'a mut mpsc::Receiver<ConnectionEvent>,
) -> Self {
Self {
send_tx,
recv_rx,
node,
backlog: VecDeque::with_capacity(BACKLOG_CAPACITY),
}
}
async fn run(
self,
bootstrap_addr: SocketAddr,
) -> Result<(Node, Section, Vec<(RoutingMsg, SocketAddr, DestInfo)>)> {
let section_key = bls::SecretKey::random().public_key();
let dest_xorname = self.node.name();
let recipients = vec![(dest_xorname, bootstrap_addr)];
self.join(section_key, recipients).await
}
async fn join(
mut self,
mut section_key: bls::PublicKey,
mut recipients: Vec<(XorName, SocketAddr)>,
) -> Result<(Node, Section, Vec<(RoutingMsg, SocketAddr, DestInfo)>)> {
let join_request = JoinRequest {
section_key,
resource_proof_response: None,
};
self.send_join_requests(join_request, &recipients, section_key)
.await?;
let mut used_recipient = HashSet::<SocketAddr>::new();
loop {
used_recipient.extend(recipients.iter().map(|(_, addr)| addr));
let (response, sender, dest_info) = self.receive_join_response().await?;
match response {
JoinResponse::Rejected(JoinRejectionReason::NodeNotReachable(addr)) => {
error!(
"Node cannot join the network since it is not externally reachable: {}",
addr
);
return Err(Error::NodeNotReachable(addr));
}
JoinResponse::Rejected(JoinRejectionReason::JoinsDisallowed) => {
error!("Network is set to not taking any new joining node, try join later.");
return Err(Error::TryJoinLater);
}
JoinResponse::Approval {
section_auth,
genesis_key,
section_chain,
..
} => {
return Ok((
self.node,
Section::new(genesis_key, section_chain, section_auth)?,
self.backlog.into_iter().collect(),
));
}
JoinResponse::Retry(section_auth) => {
if section_auth.section_key() == section_key {
debug!("Ignoring JoinResponse::Retry with invalid section authority provider key");
continue;
}
let new_recipients: Vec<(XorName, SocketAddr)> = section_auth
.elders
.iter()
.map(|(name, addr)| (*name, *addr))
.collect();
let prefix = section_auth.prefix;
if prefix.is_empty() && self.node.age() < FIRST_SECTION_MIN_AGE {
let age: u8 = (FIRST_SECTION_MIN_AGE..FIRST_SECTION_MAX_AGE)
.choose(&mut rand::thread_rng())
.unwrap_or(FIRST_SECTION_MAX_AGE);
let new_keypair =
ed25519::gen_keypair(&Prefix::default().range_inclusive(), age);
let new_name = ed25519::name(&new_keypair.public);
info!("Setting Node name to {}", new_name);
self.node = Node::new(new_keypair, self.node.addr);
}
if prefix.matches(&self.node.name()) {
if !prefix.is_empty() && self.node.age() != MIN_ADULT_AGE {
let new_keypair =
ed25519::gen_keypair(&prefix.range_inclusive(), MIN_ADULT_AGE);
let new_name = ed25519::name(&new_keypair.public);
info!("Setting Node name to {}", new_name);
self.node = Node::new(new_keypair, self.node.addr);
}
info!(
"Newer Join response for our prefix {:?} from {:?}",
section_auth, sender
);
section_key = section_auth.section_key();
let join_request = JoinRequest {
section_key,
resource_proof_response: None,
};
recipients = new_recipients;
self.send_join_requests(join_request, &recipients, section_key)
.await?;
} else {
warn!(
"Newer Join response not for our prefix {:?} from {:?}",
section_auth, sender,
);
}
}
JoinResponse::Redirect(section_auth) => {
if section_auth.section_key() == section_key {
continue;
}
let new_recipients: Vec<(XorName, SocketAddr)> = section_auth
.elders
.iter()
.filter(|(_, addr)| !used_recipient.contains(addr))
.map(|(name, addr)| (*name, *addr))
.collect();
if new_recipients.is_empty() {
debug!("Joining redirected to the same set of peers we already contacted - ignoring response");
continue;
} else {
info!(
"Joining redirected to another set of peers: {:?}",
new_recipients,
);
}
if section_auth.prefix.matches(&self.node.name()) {
info!(
"Newer Join response for our prefix {:?} from {:?}",
section_auth, sender
);
section_key = section_auth.section_key();
let join_request = JoinRequest {
section_key,
resource_proof_response: None,
};
recipients = new_recipients;
self.send_join_requests(join_request, &recipients, section_key)
.await?;
} else {
warn!(
"Newer Join response not for our prefix {:?} from {:?}",
section_auth, sender,
);
}
}
JoinResponse::ResourceChallenge {
data_size,
difficulty,
nonce,
nonce_signature,
} => {
let rp = ResourceProof::new(data_size, difficulty);
let data = rp.create_proof_data(&nonce);
let mut prover = rp.create_prover(data.clone());
let solution = prover.solve();
let join_request = JoinRequest {
section_key,
resource_proof_response: Some(ResourceProofResponse {
solution,
data,
nonce,
nonce_signature,
}),
};
let recipients = &[(dest_info.dest, sender)];
self.send_join_requests(join_request, recipients, section_key)
.await?;
}
}
}
}
async fn send_join_requests(
&mut self,
join_request: JoinRequest,
recipients: &[(XorName, SocketAddr)],
section_key: bls::PublicKey,
) -> Result<()> {
info!("Sending {:?} to {:?}", join_request, recipients);
let variant = Variant::JoinRequest(Box::new(join_request));
let message = RoutingMsg::single_src(
&self.node,
DstLocation::DirectAndUnrouted,
variant,
section_key,
)?;
let _ = self
.send_tx
.send((
MessageType::Routing {
msg: message,
dest_info: DestInfo {
dest: recipients[0].0,
dest_section_pk: section_key,
},
},
recipients.to_vec(),
))
.await;
Ok(())
}
async fn receive_join_response(&mut self) -> Result<(JoinResponse, SocketAddr, DestInfo)> {
let destination = self.node.name();
while let Some(event) = self.recv_rx.recv().await {
let (routing_msg, dest_info, join_response, sender) = match event {
ConnectionEvent::Received((sender, bytes)) => match WireMsg::deserialize(bytes) {
Ok(MessageType::Node { .. })
| Ok(MessageType::Client { .. })
| Ok(MessageType::SectionInfo { .. }) => continue,
Ok(MessageType::Routing { msg, dest_info }) => {
if let Variant::JoinResponse(resp) = &msg.variant {
let join_response = resp.clone();
(msg, dest_info, *join_response, sender)
} else {
self.backlog_message(msg, sender, dest_info);
continue;
}
}
Err(error) => {
debug!("Failed to deserialize message: {}", error);
continue;
}
},
ConnectionEvent::Disconnected(_) => continue,
};
match join_response {
JoinResponse::Rejected(JoinRejectionReason::NodeNotReachable(_))
| JoinResponse::Rejected(JoinRejectionReason::JoinsDisallowed) => {
return Ok((join_response, sender, dest_info));
}
JoinResponse::Retry(ref section_auth) => {
if !section_auth.prefix.matches(&destination) {
error!(
"Invalid JoinResponse::Retry bad prefix: {:?}",
join_response
);
continue;
}
if section_auth.elders.is_empty() {
error!(
"Invalid JoinResponse::Retry, empty list of Elders: {:?}",
join_response
);
continue;
}
if !verify_message(&routing_msg, None) {
continue;
}
return Ok((join_response, sender, dest_info));
}
JoinResponse::Redirect(ref section_auth) => {
if section_auth.elders.is_empty() {
error!(
"Invalid JoinResponse::Redirect, empty list of Elders: {:?}",
join_response
);
continue;
}
if !verify_message(&routing_msg, None) {
continue;
}
return Ok((join_response, sender, dest_info));
}
JoinResponse::ResourceChallenge { .. } => {
if !verify_message(&routing_msg, None) {
continue;
}
return Ok((join_response, sender, dest_info));
}
JoinResponse::Approval {
ref section_auth,
ref node_state,
..
} => {
if node_state.value.peer.name() != &self.node.name() {
trace!("Ignore NodeApproval not for us");
continue;
}
if !verify_message(&routing_msg, None) {
continue;
}
trace!(
"This node has been approved to join the network at {:?}!",
section_auth.value.prefix,
);
return Ok((join_response, sender, dest_info));
}
}
}
error!("RoutingMsg sender unexpectedly closed");
Err(Error::InvalidState)
}
fn backlog_message(&mut self, message: RoutingMsg, sender: SocketAddr, dest_info: DestInfo) {
while self.backlog.len() >= BACKLOG_CAPACITY {
let _ = self.backlog.pop_front();
}
self.backlog.push_back((message, sender, dest_info))
}
}
async fn send_messages(
mut rx: mpsc::Receiver<(MessageType, Vec<(XorName, SocketAddr)>)>,
comm: &Comm,
) -> Result<()> {
while let Some((message, recipients)) = rx.recv().await {
send_message(comm, message, recipients).await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
dkg::test_utils::*,
error::Error as RoutingError,
messages::RoutingMsgUtils,
section::test_utils::*,
section::{NodeStateUtils, SectionAuthorityProviderUtils},
ELDER_SIZE, MIN_ADULT_AGE, MIN_AGE,
};
use anyhow::{anyhow, Error, Result};
use assert_matches::assert_matches;
use futures::{
future::{self, Either},
pin_mut,
};
use secured_linked_list::SecuredLinkedList;
use sn_messaging::{node::NodeState, SectionAuthorityProvider};
use std::collections::BTreeMap;
use tokio::task;
#[tokio::test]
async fn join_as_adult() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (recv_tx, mut recv_rx) = mpsc::channel(1);
let (section_auth, mut nodes, sk_set) =
gen_section_authority_provider(Prefix::default(), ELDER_SIZE);
let bootstrap_node = nodes.remove(0);
let bootstrap_addr = bootstrap_node.addr;
let sk = sk_set.secret_key();
let pk = sk.public_key();
let node_age = MIN_AGE + 2;
let node = Node::new(
ed25519::gen_keypair(&Prefix::default().range_inclusive(), node_age),
gen_addr(),
);
let peer = node.peer();
let state = Join::new(node, send_tx, &mut recv_rx);
let bootstrap = async move { state.run(bootstrap_addr).await.map_err(Error::from) };
let others = async {
let (message, recipients) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("JoinRequest was not received"))?;
let bootstrap_addrs: Vec<SocketAddr> =
recipients.iter().map(|(_name, addr)| *addr).collect();
assert_eq!(bootstrap_addrs, [bootstrap_addr]);
let (message, dest_info) = assert_matches!(message, MessageType::Routing { msg, dest_info } =>
(msg, dest_info));
assert_eq!(dest_info.dest, *peer.name());
assert_matches!(message.variant, Variant::JoinRequest(request) => {
assert!(request.resource_proof_response.is_none());
});
send_response(
&recv_tx,
Variant::JoinResponse(Box::new(JoinResponse::Retry(section_auth.clone()))),
&bootstrap_node,
section_auth.section_key(),
*peer.name(),
)?;
let (message, recipients) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("JoinRequest was not received"))?;
let (message, dest_info) = assert_matches!(message, MessageType::Routing { msg, dest_info } =>
(msg, dest_info));
assert_eq!(dest_info.dest_section_pk, pk);
itertools::assert_equal(
recipients,
section_auth
.elders()
.iter()
.map(|(name, addr)| (*name, *addr))
.collect::<Vec<_>>(),
);
assert_matches!(message.variant, Variant::JoinRequest(request) => {
assert_eq!(request.section_key, pk);
});
let section_auth = section_signed(sk, section_auth.clone())?;
let node_state = section_signed(sk, NodeState::joined(peer))?;
let proof_chain = SecuredLinkedList::new(pk);
send_response(
&recv_tx,
Variant::JoinResponse(Box::new(JoinResponse::Approval {
genesis_key: pk,
section_auth: section_auth.clone(),
node_state,
section_chain: proof_chain,
})),
&bootstrap_node,
section_auth.value.section_key(),
*peer.name(),
)?;
Ok(())
};
let ((node, section, _backlog), _) = future::try_join(bootstrap, others).await?;
assert_eq!(*section.authority_provider(), section_auth);
assert_eq!(*section.chain().last_key(), pk);
assert_eq!(node.age(), node_age);
Ok(())
}
#[tokio::test]
async fn join_receive_redirect_response() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (recv_tx, mut recv_rx) = mpsc::channel(1);
let (section_auth, mut nodes, sk_set) =
gen_section_authority_provider(Prefix::default(), ELDER_SIZE);
let bootstrap_node = nodes.remove(0);
let pk_set = sk_set.public_keys();
let node = Node::new(
ed25519::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE),
gen_addr(),
);
let name = node.name();
let state = Join::new(node, send_tx, &mut recv_rx);
let bootstrap_task = state.run(bootstrap_node.addr);
let test_task = async move {
let (message, recipients) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("JoinRequest was not received"))?;
assert_eq!(
recipients
.into_iter()
.map(|peer| peer.1)
.collect::<Vec<_>>(),
vec![bootstrap_node.addr]
);
assert_matches!(message, MessageType::Routing { msg, .. } =>
assert_matches!(msg.variant, Variant::JoinRequest{..}));
let new_bootstrap_addrs: BTreeMap<_, _> = (0..ELDER_SIZE)
.map(|_| (XorName::random(), gen_addr()))
.collect();
send_response(
&recv_tx,
Variant::JoinResponse(Box::new(JoinResponse::Redirect(SectionAuthorityProvider {
prefix: Prefix::default(),
public_key_set: pk_set.clone(),
elders: new_bootstrap_addrs.clone(),
}))),
&bootstrap_node,
section_auth.section_key(),
name,
)?;
task::yield_now().await;
let (message, recipients) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("JoinRequest was not received"))?;
assert_eq!(
recipients
.into_iter()
.map(|peer| peer.1)
.collect::<Vec<_>>(),
new_bootstrap_addrs
.iter()
.map(|(_, addr)| *addr)
.collect::<Vec<_>>()
);
let (message, dest_info) = assert_matches!(message, MessageType::Routing { msg, dest_info } =>
(msg, dest_info));
assert_eq!(dest_info.dest_section_pk, pk_set.public_key());
assert_matches!(message.variant, Variant::JoinRequest(req) => {
assert_eq!(req.section_key, pk_set.public_key());
});
Ok(())
};
pin_mut!(bootstrap_task);
pin_mut!(test_task);
match future::select(bootstrap_task, test_task).await {
Either::Left(_) => unreachable!(),
Either::Right((output, _)) => output,
}
}
#[tokio::test]
async fn join_invalid_redirect_response() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (recv_tx, mut recv_rx) = mpsc::channel(1);
let (section_auth, mut nodes, sk_set) =
gen_section_authority_provider(Prefix::default(), ELDER_SIZE);
let bootstrap_node = nodes.remove(0);
let pk_set = sk_set.public_keys();
let node = Node::new(
ed25519::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE),
gen_addr(),
);
let node_name = node.name();
let state = Join::new(node, send_tx, &mut recv_rx);
let bootstrap_task = state.run(bootstrap_node.addr);
let test_task = async {
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("JoinRequest was not received"))?;
assert_matches!(message, MessageType::Routing { msg, .. } =>
assert_matches!(msg.variant, Variant::JoinRequest{..}));
send_response(
&recv_tx,
Variant::JoinResponse(Box::new(JoinResponse::Redirect(SectionAuthorityProvider {
prefix: Prefix::default(),
public_key_set: pk_set.clone(),
elders: BTreeMap::new(),
}))),
&bootstrap_node,
section_auth.section_key(),
node_name,
)?;
task::yield_now().await;
let addrs = (0..ELDER_SIZE)
.map(|_| (XorName::random(), gen_addr()))
.collect();
send_response(
&recv_tx,
Variant::JoinResponse(Box::new(JoinResponse::Redirect(SectionAuthorityProvider {
prefix: Prefix::default(),
public_key_set: pk_set.clone(),
elders: addrs,
}))),
&bootstrap_node,
section_auth.section_key(),
node_name,
)?;
task::yield_now().await;
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("JoinRequest was not received"))?;
assert_matches!(message, MessageType::Routing { msg, .. } =>
assert_matches!(msg.variant, Variant::JoinRequest{..}));
Ok(())
};
pin_mut!(bootstrap_task);
pin_mut!(test_task);
match future::select(bootstrap_task, test_task).await {
Either::Left(_) => unreachable!(),
Either::Right((output, _)) => output,
}
}
#[tokio::test]
async fn join_disallowed_response() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (recv_tx, mut recv_rx) = mpsc::channel(1);
let (section_auth, mut nodes, _) =
gen_section_authority_provider(Prefix::default(), ELDER_SIZE);
let bootstrap_node = nodes.remove(0);
let node = Node::new(
ed25519::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE),
gen_addr(),
);
let node_name = node.name();
let state = Join::new(node, send_tx, &mut recv_rx);
let bootstrap_task = state.run(bootstrap_node.addr);
let test_task = async {
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("JoinRequest was not received"))?;
assert_matches!(message, MessageType::Routing { msg, .. } =>
assert_matches!(msg.variant, Variant::JoinRequest{..}));
send_response(
&recv_tx,
Variant::JoinResponse(Box::new(JoinResponse::Rejected(
JoinRejectionReason::JoinsDisallowed,
))),
&bootstrap_node,
section_auth.section_key(),
node_name,
)?;
Ok(())
};
let (join_result, test_result) = future::join(bootstrap_task, test_task).await;
if let Err(RoutingError::TryJoinLater) = join_result {
} else {
return Err(anyhow!("Not getting an execpted network rejection."));
}
test_result
}
#[tokio::test]
async fn join_invalid_retry_prefix_response() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (recv_tx, mut recv_rx) = mpsc::channel(1);
let bootstrap_node = Node::new(
ed25519::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE),
gen_addr(),
);
let node = Node::new(
ed25519::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE),
gen_addr(),
);
let node_name = node.name();
let (good_prefix, bad_prefix) = {
let p0 = Prefix::default().pushed(false);
let p1 = Prefix::default().pushed(true);
if node.name().bit(0) {
(p1, p0)
} else {
(p0, p1)
}
};
let state = Join::new(node, send_tx, &mut recv_rx);
let section_key = bls::SecretKey::random().public_key();
let elders = (0..ELDER_SIZE)
.map(|_| (good_prefix.substituted_in(rand::random()), gen_addr()))
.collect();
let join_task = state.join(section_key, elders);
let test_task = async {
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("RoutingMsg was not received"))?;
let message = assert_matches!(message, MessageType::Routing{ msg, .. } => msg);
assert_matches!(message.variant, Variant::JoinRequest(_));
send_response(
&recv_tx,
Variant::JoinResponse(Box::new(JoinResponse::Retry(
gen_section_authority_provider(bad_prefix, ELDER_SIZE).0,
))),
&bootstrap_node,
section_key,
node_name,
)?;
task::yield_now().await;
send_response(
&recv_tx,
Variant::JoinResponse(Box::new(JoinResponse::Retry(
gen_section_authority_provider(good_prefix, ELDER_SIZE).0,
))),
&bootstrap_node,
section_key,
node_name,
)?;
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("RoutingMsg was not received"))?;
let message = assert_matches!(message, MessageType::Routing{ msg, .. } => msg);
assert_matches!(message.variant, Variant::JoinRequest(_));
Ok(())
};
pin_mut!(join_task);
pin_mut!(test_task);
match future::select(join_task, test_task).await {
Either::Left(_) => unreachable!(),
Either::Right((output, _)) => output,
}
}
fn send_response(
recv_tx: &mpsc::Sender<ConnectionEvent>,
variant: Variant,
bootstrap_node: &Node,
section_key: bls::PublicKey,
node_name: XorName,
) -> Result<()> {
let message = RoutingMsg::single_src(
bootstrap_node,
DstLocation::DirectAndUnrouted,
variant,
section_key,
)?;
recv_tx.try_send(ConnectionEvent::Received((
bootstrap_node.addr,
MessageType::Routing {
msg: message,
dest_info: DestInfo {
dest: node_name,
dest_section_pk: section_key,
},
}
.serialize()?,
)))?;
Ok(())
}
}