use super::common::{Base, Bootstrapped, USER_MSG_CACHE_EXPIRY_DURATION_SECS};
use QUORUM;
use ack_manager::{ACK_TIMEOUT_SECS, Ack, AckManager};
use action::Action;
use cache::Cache;
use crust::{ConnectionInfoResult, CrustError, CrustUser, PeerId, PrivConnectionInfo,
PubConnectionInfo, Service};
use crust::Event as CrustEvent;
use error::{InterfaceError, RoutingError};
use event::Event;
use id::{FullId, PublicId};
use itertools::Itertools;
use log::LogLevel;
use lru_time_cache::LruCache;
use maidsafe_utilities::serialisation;
use messages::{DEFAULT_PRIORITY, DirectMessage, HopMessage, MAX_PART_LEN, Message, MessageContent,
RoutingMessage, SectionList, SignedMessage, UserMessage, UserMessageCache};
use outbox::EventBox;
use peer_manager::{ConnectionInfoPreparedResult, Peer, PeerManager, PeerState,
RESOURCE_PROOF_DURATION_SECS, SectionMap};
use rand::{self, Rng};
use resource_proof::ResourceProof;
use routing_message_filter::{FilteringResult, RoutingMessageFilter};
use routing_table::{Authority, OtherMergeDetails, OwnMergeState, Prefix, RemovalDetails, Xorable};
use routing_table::Error as RoutingTableError;
#[cfg(feature = "use-mock-crust")]
use routing_table::RoutingTable;
use rust_sodium::crypto::{box_, sign};
use rust_sodium::crypto::hash::sha256;
use section_list_cache::SectionListCache;
use signature_accumulator::{ACCUMULATION_TIMEOUT_SECS, SignatureAccumulator};
use state_machine::Transition;
use stats::Stats;
use std::{cmp, fmt, iter, mem};
use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
#[cfg(feature = "use-mock-crust")]
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::time::{Duration, Instant};
use timer::Timer;
use tunnels::Tunnels;
use types::MessageId;
use utils;
use xor_name::XorName;
const TICK_TIMEOUT_SECS: u64 = 60;
const GET_NODE_NAME_TIMEOUT_SECS: u64 = 60 + RESOURCE_PROOF_DURATION_SECS;
const RESOURCE_PROOF_DIFFICULTY: u8 = 0;
const RESOURCE_PROOF_TARGET_SIZE: usize = 250 * 1024 * 1024;
const RT_MIN_TIMEOUT_SECS: u64 = 30;
const RT_MAX_TIMEOUT_SECS: u64 = 300;
const APPROVAL_TIMEOUT_SECS: u64 = RESOURCE_PROOF_DURATION_SECS + ACCUMULATION_TIMEOUT_SECS +
(4 * ACK_TIMEOUT_SECS);
const APPROVAL_PROGRESS_INTERVAL_SECS: u64 = 30;
const CANDIDATE_STATUS_INTERVAL_SECS: u64 = 60;
const MERGE_TIMEOUT_SECS: u64 = 300;
const BOOTSTRAPPER_HOLD_DUR_SEC: u64 = 300;
pub struct Node {
ack_mgr: AckManager,
cacheable_user_msg_cache: UserMessageCache,
crust_service: Service,
full_id: FullId,
get_approval_timer_token: Option<u64>,
approval_progress_timer_token: Option<u64>,
approval_expiry_time: Instant,
is_first_node: bool,
is_approved: bool,
msg_queue: VecDeque<RoutingMessage>,
peer_mgr: PeerManager,
response_cache: Box<Cache>,
routing_msg_filter: RoutingMessageFilter,
sig_accumulator: SignatureAccumulator,
section_list_sigs: SectionListCache,
stats: Stats,
tick_timer_token: u64,
timer: Timer,
tunnels: Tunnels,
user_msg_cache: UserMessageCache,
next_node_name: Option<XorName>,
rt_msg_id: Option<MessageId>,
rt_timeout: Duration,
rt_timer_token: Option<u64>,
routing_msg_backlog: Vec<RoutingMessage>,
merge_cache: LruCache<Prefix<XorName>, SectionMap>,
candidate_timer_token: Option<u64>,
candidate_status_token: Option<u64>,
resource_proof_response_parts: HashMap<PeerId, Vec<DirectMessage>>,
challenger_count: usize,
proxy_is_resource_proof_challenger: bool,
cached_section_update_requests: BTreeSet<Prefix<XorName>>,
bootstrappers: LruCache<PeerId, CrustUser>,
}
impl Node {
pub fn first(cache: Box<Cache>,
crust_service: Service,
mut full_id: FullId,
min_section_size: usize,
timer: Timer)
-> Option<Self> {
let name = XorName(sha256::hash(&full_id.public_id().name().0).0);
full_id.public_id_mut().set_name(name);
let mut node = Self::new(cache,
crust_service,
true,
full_id,
min_section_size,
Stats::new(),
timer);
if let Err(error) = node.crust_service.start_listening_tcp() {
error!("{:?} Failed to start listening: {:?}", node, error);
None
} else {
debug!("{:?} State changed to node.", node);
info!("{:?} Started a new network as a seed node.", node);
Some(node)
}
}
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn from_bootstrapping(cache: Box<Cache>,
crust_service: Service,
full_id: FullId,
min_section_size: usize,
proxy_peer_id: PeerId,
proxy_public_id: PublicId,
stats: Stats,
timer: Timer)
-> Option<Self> {
let mut node = Self::new(cache,
crust_service,
false,
full_id,
min_section_size,
stats,
timer);
let _ = node.peer_mgr.set_proxy(proxy_peer_id, proxy_public_id);
if let Err(error) = node.relocate() {
error!("{:?} Failed to start relocation: {:?}", node, error);
None
} else {
debug!("{:?} State changed to node.", node);
Some(node)
}
}
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
fn new(cache: Box<Cache>,
crust_service: Service,
first_node: bool,
full_id: FullId,
min_section_size: usize,
stats: Stats,
mut timer: Timer)
-> Self {
let public_id = *full_id.public_id();
let tick_period = Duration::from_secs(TICK_TIMEOUT_SECS);
let tick_timer_token = timer.schedule(tick_period);
let user_msg_cache_duration = Duration::from_secs(USER_MSG_CACHE_EXPIRY_DURATION_SECS);
Node {
ack_mgr: AckManager::new(),
cacheable_user_msg_cache:
UserMessageCache::with_expiry_duration(user_msg_cache_duration),
crust_service: crust_service,
full_id: full_id,
get_approval_timer_token: None,
approval_progress_timer_token: None,
approval_expiry_time: Instant::now(),
is_first_node: first_node,
is_approved: first_node,
msg_queue: VecDeque::new(),
peer_mgr: PeerManager::new(min_section_size, public_id),
response_cache: cache,
routing_msg_filter: RoutingMessageFilter::new(),
sig_accumulator: Default::default(),
section_list_sigs: SectionListCache::new(),
stats: stats,
tick_timer_token: tick_timer_token,
timer: timer,
tunnels: Default::default(),
user_msg_cache: UserMessageCache::with_expiry_duration(user_msg_cache_duration),
next_node_name: None,
rt_msg_id: None,
rt_timeout: Duration::from_secs(RT_MIN_TIMEOUT_SECS),
rt_timer_token: None,
routing_msg_backlog: vec![],
merge_cache: LruCache::with_expiry_duration(Duration::from_secs(MERGE_TIMEOUT_SECS)),
candidate_timer_token: None,
candidate_status_token: None,
resource_proof_response_parts: HashMap::new(),
challenger_count: 0,
proxy_is_resource_proof_challenger: false,
cached_section_update_requests: BTreeSet::new(),
bootstrappers:
LruCache::with_expiry_duration(Duration::from_secs(BOOTSTRAPPER_HOLD_DUR_SEC)),
}
}
fn update_stats(&mut self) {
let old_client_num = self.stats.cur_client_num;
self.stats.cur_client_num = self.peer_mgr.client_num();
if self.stats.cur_client_num != old_client_num {
if self.stats.cur_client_num > old_client_num {
self.stats.cumulative_client_num += self.stats.cur_client_num - old_client_num;
}
if self.is_approved {
info!(target: "routing_stats", "{:?} - Connected clients: {}, cumulative: {}",
self,
self.stats.cur_client_num,
self.stats.cumulative_client_num);
}
}
if self.stats.tunnel_connections != self.tunnels.tunnel_count() ||
self.stats.tunnel_client_pairs != self.tunnels.client_count() {
self.stats.tunnel_connections = self.tunnels.tunnel_count();
self.stats.tunnel_client_pairs = self.tunnels.client_count();
if self.is_approved {
info!(target: "routing_stats",
"{:?} - Indirect connections: {}, tunnelling for: {}",
self,
self.stats.tunnel_connections,
self.stats.tunnel_client_pairs);
}
}
if self.stats.cur_routing_table_size != self.peer_mgr.routing_table().len() {
self.stats.cur_routing_table_size = self.peer_mgr.routing_table().len();
if self.is_approved {
self.print_rt_size();
}
}
}
fn print_rt_size(&self) {
const TABLE_LVL: LogLevel = LogLevel::Info;
if log_enabled!(TABLE_LVL) {
let status_str = format!("{:?} {:?} - Routing Table size: {:3}",
self,
self.crust_service.id(),
self.stats.cur_routing_table_size);
let network_estimate = match self.peer_mgr.routing_table().network_size_estimate() {
(n, true) => format!("Exact network size: {}", n),
(n, false) => format!("Estimated network size: {}", n),
};
let sep_len = cmp::max(status_str.len(), network_estimate.len());
let sep_str = iter::repeat('-').take(sep_len).collect::<String>();
log!(target: "routing_stats", TABLE_LVL, " -{}- ", sep_str);
log!(target: "routing_stats", TABLE_LVL, "| {:<1$} |", status_str, sep_len);
log!(target: "routing_stats", TABLE_LVL, "| {:<1$} |", network_estimate, sep_len);
log!(target: "routing_stats", TABLE_LVL, " -{}- ", sep_str);
}
}
pub fn handle_action(&mut self, action: Action, outbox: &mut EventBox) -> Transition {
match action {
Action::ClientSendRequest { result_tx, .. } => {
let _ = result_tx.send(Err(InterfaceError::InvalidState));
}
Action::NodeSendMessage { src, dst, content, priority, result_tx } => {
let result = match self.send_user_message(src, dst, content, priority) {
Err(RoutingError::Interface(err)) => Err(err),
Err(_) | Ok(()) => Ok(()),
};
let _ = result_tx.send(result);
}
Action::Name { result_tx } => {
let _ = result_tx.send(*self.name());
}
Action::Timeout(token) => {
if let Transition::Terminate = self.handle_timeout(token, outbox) {
return Transition::Terminate;
}
}
Action::Terminate => {
return Transition::Terminate;
}
}
self.handle_routing_messages(outbox);
self.update_stats();
Transition::Stay
}
pub fn handle_crust_event(&mut self,
crust_event: CrustEvent,
outbox: &mut EventBox)
-> Transition {
match crust_event {
CrustEvent::BootstrapAccept(peer_id, peer_kind) => {
self.handle_bootstrap_accept(peer_id, peer_kind)
}
CrustEvent::BootstrapConnect(peer_id, _) => {
self.handle_bootstrap_connect(peer_id, outbox)
}
CrustEvent::ConnectSuccess(peer_id) => self.handle_connect_success(peer_id),
CrustEvent::ConnectFailure(peer_id) => self.handle_connect_failure(peer_id),
CrustEvent::LostPeer(peer_id) => {
if let Transition::Terminate = self.handle_lost_peer(peer_id, outbox) {
return Transition::Terminate;
}
}
CrustEvent::NewMessage(peer_id, bytes) => {
match self.handle_new_message(peer_id, bytes, outbox) {
Err(RoutingError::FilterCheckFailed) |
Ok(_) => (),
Err(err) => debug!("{:?} - {:?}", self, err),
}
}
CrustEvent::ConnectionInfoPrepared(ConnectionInfoResult { result_token, result }) => {
self.handle_connection_info_prepared(result_token, result)
}
CrustEvent::ListenerStarted(port) => {
trace!("{:?} Listener started on port {}.", self, port);
self.crust_service.set_service_discovery_listen(true);
return Transition::Stay;
}
CrustEvent::ListenerFailed => {
error!("{:?} Failed to start listening.", self);
outbox.send_event(Event::Terminate);
return Transition::Terminate;
}
CrustEvent::WriteMsgSizeProhibitive(peer_id, msg) => {
error!("{:?} Failed to send {}-byte message to {:?}. Message too large.",
self,
msg.len(),
peer_id);
}
_ => {
debug!("{:?} - Unhandled crust event: {:?}", self, crust_event);
}
}
self.handle_routing_messages(outbox);
self.update_stats();
Transition::Stay
}
fn handle_routing_messages(&mut self, outbox: &mut EventBox) {
while let Some(routing_msg) = self.msg_queue.pop_front() {
if self.in_authority(&routing_msg.dst) {
if let Err(err) = self.dispatch_routing_message(routing_msg, outbox) {
debug!("{:?} Routing message dispatch failed: {:?}", self, err);
}
}
}
}
fn handle_bootstrap_accept(&mut self, peer_id: PeerId, peer_kind: CrustUser) {
trace!("{:?} Received BootstrapAccept from {:?} as {:?}.",
self,
peer_id,
peer_kind);
if let Some(peer) = self.bootstrappers.insert(peer_id, peer_kind) {
trace!("{:?} Replacing Bootstrapper {:?} who was previously registered as {:?}",
self,
peer_id,
peer);
}
}
fn handle_bootstrap_connect(&mut self, peer_id: PeerId, outbox: &mut EventBox) {
self.disconnect_peer(&peer_id, Some(outbox))
}
fn handle_connect_success(&mut self, peer_id: PeerId) {
if peer_id == self.crust_service.id() {
debug!("{:?} Received ConnectSuccess event with our Crust peer ID.",
self);
return;
}
if !self.crust_service.is_peer_whitelisted(&peer_id) {
debug!("{:?} Received ConnectSuccess, but {:?} is not whitelisted.",
self,
peer_id);
self.disconnect_peer(&peer_id, None);
return;
}
if let Some(tunnel_id) = self.tunnels.remove_tunnel_for(&peer_id) {
debug!("{:?} Removing unwanted tunnel for {:?}", self, peer_id);
let message = DirectMessage::TunnelDisconnect(peer_id);
self.send_direct_message(tunnel_id, message);
} else if let Some(pub_id) = self.peer_mgr.get_routing_peer(&peer_id) {
warn!("{:?} Received ConnectSuccess from {:?}, but node {:?} is already in routing \
state in peer_map.",
self,
peer_id,
pub_id.name());
return;
}
self.peer_mgr.connected_to(&peer_id);
let id_type = if self.is_approved {
"NodeIdentify"
} else {
"CandidateIdentify"
};
debug!("{:?} Received ConnectSuccess from {:?}. Sending {}.",
self,
peer_id,
id_type);
self.send_node_identify(peer_id);
}
fn handle_connect_failure(&mut self, peer_id: PeerId) {
if peer_id == self.crust_service.id() {
debug!("{:?} Received ConnectFailure event with our Crust peer ID.",
self);
return;
}
if let Some(&pub_id) = self.peer_mgr.get_connecting_peer(&peer_id) {
debug!("{:?} Failed to connect to peer {:?} with pub_id {:?}.",
self,
peer_id,
pub_id);
if self.tunnels.tunnel_for(&peer_id).is_none() {
self.find_tunnel_for_peer(peer_id, &pub_id);
} else {
debug!("{:?} already has tunnel to peer {:?} with pub_id {:?}.",
self,
peer_id,
pub_id);
}
}
}
fn find_tunnel_for_peer(&mut self, peer_id: PeerId, pub_id: &PublicId) {
for (name, dst_peer_id) in self.peer_mgr.set_searching_for_tunnel(peer_id, *pub_id) {
trace!("{:?} Asking {:?} to serve as a tunnel for {:?}.",
self,
name,
peer_id);
let tunnel_request = DirectMessage::TunnelRequest(peer_id);
self.send_direct_message(dst_peer_id, tunnel_request);
}
}
fn handle_new_message(&mut self,
peer_id: PeerId,
bytes: Vec<u8>,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
match serialisation::deserialise(&bytes) {
Ok(Message::Hop(hop_msg)) => self.handle_hop_message(hop_msg, peer_id),
Ok(Message::Direct(direct_msg)) => {
self.handle_direct_message(direct_msg, peer_id, outbox)
}
Ok(Message::TunnelDirect { content, src, dst }) => {
if dst == self.crust_service.id() {
if self.tunnels.tunnel_for(&src) == Some(&peer_id) {
self.handle_direct_message(content, src, outbox)
} else {
debug!("{:?} Message recd via unregistered tunnel node {:?} from src {:?}",
self,
peer_id,
src);
Err(RoutingError::InvalidDestination)
}
} else if self.tunnels.has_clients(src, dst) {
self.send_or_drop(&dst, bytes, content.priority());
Ok(())
} else if !self.peer_mgr.can_tunnel_for(&src, &dst) {
debug!("{:?} Can no longer accept as a tunnel node for {:?} - {:?}",
self,
src,
dst);
self.send_direct_message(src, DirectMessage::TunnelClosed(dst));
Err(RoutingError::InvalidDestination)
} else if match content {
DirectMessage::CandidateIdentify { .. } |
DirectMessage::NodeIdentify { .. } => true,
_ => false,
} && self.tunnels.accept_clients(src, dst) {
debug!("{:?} Agreed to act as tunnel node for {:?} - {:?}",
self,
src,
dst);
self.send_direct_message(dst, DirectMessage::TunnelSuccess(src));
self.send_or_drop(&dst, bytes, content.priority());
Ok(())
} else {
debug!("{:?} Invalid TunnelDirect message received via {:?}: {:?} -> {:?} \
{:?}",
self,
peer_id,
src,
dst,
content);
Err(RoutingError::InvalidDestination)
}
}
Ok(Message::TunnelHop { content, src, dst }) => {
if dst == self.crust_service.id() {
self.handle_hop_message(content, src)
} else if self.tunnels.has_clients(src, dst) {
self.send_or_drop(&dst, bytes, content.content.priority());
Ok(())
} else {
debug!("{:?} Invalid TunnelHop message received via {:?}: {:?} -> {:?} {:?}",
self,
peer_id,
src,
dst,
content);
Err(RoutingError::InvalidDestination)
}
}
Err(error) => Err(RoutingError::SerialisationError(error)),
}
}
fn handle_direct_message(&mut self,
direct_message: DirectMessage,
peer_id: PeerId,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
use messages::DirectMessage::*;
match direct_message {
MessageSignature(digest, sig) => self.handle_message_signature(digest, sig, peer_id)?,
SectionListSignature(section_list, sig) => {
self.handle_section_list_signature(peer_id, section_list, sig)?
}
ClientIdentify { ref serialised_public_id, ref signature, client_restriction } => {
let drop = match self.bootstrappers.remove(&peer_id) {
Some(kind) => {
if kind == CrustUser::Client && client_restriction ||
kind == CrustUser::Node && !client_restriction {
false
} else {
debug!("{:?} Peer bootstrapped to us as {:?} but is sending messages \
as a with client_restriction: {}",
self,
kind,
client_restriction);
true
}
}
None => {
debug!("{:?} No mention of peer {:?} as a bootstrapper",
self,
peer_id);
true
}
};
if drop {
return Ok(self.disconnect_peer(&peer_id, Some(outbox)));
}
if let Ok(public_id) = verify_signed_public_id(serialised_public_id, signature) {
self.handle_client_identify(public_id, peer_id, client_restriction, outbox)
} else {
warn!("{:?} Signature check failed in ClientIdentify, so dropping connection \
{:?}.",
self,
peer_id);
self.disconnect_peer(&peer_id, Some(outbox));
}
}
NodeIdentify { ref serialised_public_id, ref signature } => {
if let Ok(public_id) = verify_signed_public_id(serialised_public_id, signature) {
debug!("{:?} Handling NodeIdentify from {:?}.",
self,
public_id.name());
self.add_to_routing_table(&public_id, &peer_id, outbox);
} else {
warn!("{:?} Signature check failed in NodeIdentify, so dropping peer {:?}.",
self,
peer_id);
self.disconnect_peer(&peer_id, Some(outbox));
}
}
CandidateIdentify { ref serialised_public_id, ref signature } => {
if let Ok(public_id) = verify_signed_public_id(serialised_public_id, signature) {
self.handle_candidate_identify(public_id, peer_id, outbox);
} else {
warn!("{:?} Signature check failed in CandidateIdentify, so dropping peer \
{:?}.",
self,
peer_id);
self.disconnect_peer(&peer_id, Some(outbox));
}
}
TunnelRequest(dst_id) => self.handle_tunnel_request(peer_id, dst_id),
TunnelSuccess(dst_id) => self.handle_tunnel_success(peer_id, dst_id),
TunnelClosed(dst_id) => self.handle_tunnel_closed(peer_id, dst_id, outbox),
TunnelDisconnect(dst_id) => self.handle_tunnel_disconnect(peer_id, dst_id),
ResourceProof { seed, target_size, difficulty } => {
self.handle_resource_proof_request(peer_id, seed, target_size, difficulty)?
}
ResourceProofResponseReceipt => {
self.handle_resource_proof_response_receipt(peer_id);
}
ResourceProofResponse { part_index, part_count, proof, leading_zero_bytes } => {
self.handle_resource_proof_response(peer_id,
part_index,
part_count,
proof,
leading_zero_bytes);
}
msg @ BootstrapIdentify { .. } |
msg @ BootstrapDeny => {
debug!("{:?} Unhandled direct message: {:?}", self, msg);
}
}
Ok(())
}
fn handle_message_signature(&mut self,
digest: sha256::Digest,
sig: sign::Signature,
peer_id: PeerId)
-> Result<(), RoutingError> {
if let Some(&pub_id) = self.peer_mgr.get_routing_peer(&peer_id) {
let min_section_size = self.min_section_size();
if let Some((signed_msg, route)) =
self.sig_accumulator.add_signature(min_section_size, digest, sig, pub_id) {
let hop = *self.name(); return self.handle_signed_message(signed_msg, route, hop, &BTreeSet::new());
}
} else {
debug!("{:?} Received message signature from unknown peer {:?}",
self,
peer_id);
}
Ok(())
}
fn get_section(&self, prefix: &Prefix<XorName>) -> Result<BTreeSet<XorName>, RoutingError> {
let section = self.peer_mgr
.routing_table()
.get_section(&prefix.lower_bound())
.ok_or(RoutingError::InvalidSource)?
.iter()
.cloned()
.collect();
Ok(section)
}
fn get_section_list(&self, prefix: &Prefix<XorName>) -> Result<SectionList, RoutingError> {
Ok(SectionList::new(*prefix,
self.peer_mgr.get_pub_ids(&self.get_section(prefix)?)))
}
fn send_section_list_signature(&mut self, prefix: Prefix<XorName>, dst: Option<XorName>) {
let section = match self.get_section_list(&prefix) {
Ok(section) => section,
Err(err) => {
debug!("{:?} Error getting section list for {:?}: {:?}",
self,
prefix,
err);
return;
}
};
let serialised = match serialisation::serialise(§ion) {
Ok(serialised) => serialised,
Err(err) => {
warn!("{:?} Error serialising section list for {:?}: {:?}",
self,
prefix,
err);
return;
}
};
let sig = sign::sign_detached(&serialised, self.full_id.signing_private_key());
self.section_list_sigs.add_signature(prefix,
*self.full_id.public_id(),
section.clone(),
sig,
self.peer_mgr
.routing_table()
.our_section()
.len());
let peers = if let Some(dst) = dst {
self.peer_mgr
.get_peer_id(&dst)
.into_iter()
.cloned()
.collect_vec()
} else {
self.peer_mgr
.routing_table()
.our_section()
.into_iter()
.filter(|&x| *x != *self.name()) .filter_map(|x| self.peer_mgr.get_peer_id(x)) .cloned()
.collect_vec()
};
for peer_id in peers {
let msg = DirectMessage::SectionListSignature(section.clone(), sig);
self.send_direct_message(peer_id, msg);
}
}
fn handle_section_list_signature(&mut self,
peer_id: PeerId,
section_list: SectionList,
sig: sign::Signature)
-> Result<(), RoutingError> {
let src_pub_id = self.peer_mgr
.get_routing_peer(&peer_id)
.ok_or(RoutingError::InvalidSource)?;
let serialised = serialisation::serialise(§ion_list)?;
if sign::verify_detached(&sig, &serialised, src_pub_id.signing_public_key()) {
self.section_list_sigs.add_signature(section_list.prefix,
*src_pub_id,
section_list,
sig,
self.peer_mgr
.routing_table()
.our_section()
.len());
Ok(())
} else {
Err(RoutingError::FailedSignature)
}
}
fn handle_hop_message(&mut self,
hop_msg: HopMessage,
peer_id: PeerId)
-> Result<(), RoutingError> {
let hop_name = if let Some(peer) = self.peer_mgr.get_connected_peer(&peer_id) {
hop_msg.verify(peer.pub_id().signing_public_key())?;
match *peer.state() {
PeerState::Client => {
self.check_valid_client_message(hop_msg.content.routing_message())?;
*self.name()
}
PeerState::JoiningNode => *self.name(),
_ => *peer.name(),
}
} else {
debug!("{:?} Can't find sender {:?} of {:?}",
self,
peer_id,
hop_msg);
*self.name()
};
let HopMessage { content, route, sent_to, .. } = hop_msg;
self.handle_signed_message(content, route, hop_name, &sent_to)
}
fn ack_and_broadcast(&mut self,
signed_msg: &SignedMessage,
route: u8,
hop_name: XorName,
sent_to: &BTreeSet<XorName>) {
self.send_ack(signed_msg.routing_message(), route);
if signed_msg.routing_message().dst.is_multiple() {
if let Err(error) = self.send_signed_message(signed_msg, route, &hop_name, sent_to) {
debug!("{:?} Failed to send {:?}: {:?}", self, signed_msg, error);
}
}
}
fn handle_signed_message(&mut self,
signed_msg: SignedMessage,
route: u8,
hop_name: XorName,
sent_to: &BTreeSet<XorName>)
-> Result<(), RoutingError> {
signed_msg.check_integrity(self.min_section_size())?;
if self.our_prefix().bit_count() > 0 && signed_msg.routing_message().src.is_multiple() &&
signed_msg.src_size() * 100 < QUORUM * self.min_section_size() {
warn!("{:?} Not enough signatures in {:?}.", self, signed_msg);
return Err(RoutingError::NotEnoughSignatures);
}
match self.routing_msg_filter.filter_incoming(signed_msg.routing_message(), route) {
FilteringResult::KnownMessageAndRoute => {
return Ok(());
}
frslt @ FilteringResult::KnownMessage |
frslt @ FilteringResult::NewMessage => {
if self.in_authority(&signed_msg.routing_message().dst) {
self.ack_and_broadcast(&signed_msg, route, hop_name, sent_to);
if frslt == FilteringResult::NewMessage {
self.msg_queue.push_back(signed_msg.into_routing_message());
}
return Ok(());
}
}
}
if self.respond_from_cache(signed_msg.routing_message(), route)? {
return Ok(());
}
if let Err(error) = self.send_signed_message(&signed_msg, route, &hop_name, sent_to) {
debug!("{:?} Failed to send {:?}: {:?}", self, signed_msg, error);
}
Ok(())
}
fn dispatch_routing_message(&mut self,
routing_msg: RoutingMessage,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
use messages::MessageContent::*;
use Authority::{Client, ManagedNode, PrefixSection, Section};
if !self.is_approved {
match routing_msg.content {
SectionSplit(..) |
OwnSectionMerge(..) |
OtherSectionMerge(..) |
ExpectCandidate { .. } |
AcceptAsCandidate { .. } |
CandidateApproval { .. } |
ConnectionInfoRequest { .. } |
SectionUpdateRequest { .. } |
SectionUpdate { .. } |
RoutingTableRequest(..) |
RoutingTableResponse { .. } |
UserMessagePart { .. } => {
trace!("{:?} Not approved yet. Delaying message handling: {:?}",
self,
routing_msg);
self.routing_msg_backlog.push(routing_msg);
return Ok(());
}
GetNodeName { .. } |
ConnectionInfoResponse { .. } |
GetNodeNameResponse { .. } |
Ack(..) |
NodeApproval { .. } => {
}
}
}
match routing_msg.content {
Ack(..) |
RoutingTableRequest(..) |
UserMessagePart { .. } => (),
_ => trace!("{:?} Got routing message {:?}.", self, routing_msg),
}
match (routing_msg.content, routing_msg.src, routing_msg.dst) {
(GetNodeName { current_id, message_id },
Client { client_key, proxy_node_name, peer_id },
Section(dst_name)) => {
self.handle_get_node_name_request(current_id,
client_key,
proxy_node_name,
dst_name,
peer_id,
message_id)
}
(GetNodeNameResponse { relocated_id, section, .. }, Section(_), dst) => {
Ok(self.handle_get_node_name_response(relocated_id, section, dst, outbox))
}
(ExpectCandidate { expect_id, client_auth, message_id }, Section(_), Section(_)) => {
self.handle_expect_candidate(expect_id, client_auth, message_id, outbox)
}
(AcceptAsCandidate { expect_id, client_auth, message_id }, Section(_), Section(_)) => {
self.handle_accept_as_candidate(expect_id, client_auth, message_id, outbox)
}
(ConnectionInfoRequest { encrypted_conn_info, nonce, pub_id, msg_id },
src @ Client { .. },
dst @ ManagedNode(_)) |
(ConnectionInfoRequest { encrypted_conn_info, nonce, pub_id, msg_id },
src @ ManagedNode(_),
dst @ ManagedNode(_)) => {
self.handle_connection_info_request(encrypted_conn_info,
nonce,
pub_id,
msg_id,
src,
dst,
outbox)
}
(ConnectionInfoResponse { encrypted_conn_info, nonce, pub_id, msg_id },
ManagedNode(src_name),
dst @ Client { .. }) |
(ConnectionInfoResponse { encrypted_conn_info, nonce, pub_id, msg_id },
ManagedNode(src_name),
dst @ ManagedNode(_)) => {
self.handle_connection_info_response(encrypted_conn_info,
nonce,
pub_id,
msg_id,
src_name,
dst)
}
(CandidateApproval { candidate_id, client_auth, .. }, Section(_), Section(_)) => {
self.handle_candidate_approval(candidate_id, client_auth, outbox)
}
(NodeApproval { sections }, Section(_), Client { .. }) => {
self.handle_node_approval(§ions, outbox)
}
(SectionUpdateRequest { our_prefix }, ManagedNode(_), PrefixSection(_)) => {
self.handle_section_update_request(our_prefix);
Ok(())
}
(SectionUpdate { prefix, members, merge }, Section(_), PrefixSection(_)) => {
self.handle_section_update(prefix, members, merge, outbox)
}
(RoutingTableRequest(msg_id, digest), src @ ManagedNode(_), dst @ Section(_)) => {
self.handle_rt_req(msg_id, digest, src, dst)
}
(RoutingTableResponse { prefix, members, message_id }, Section(_), ManagedNode(_)) => {
self.handle_rt_rsp(prefix, members, message_id, outbox)
}
(SectionSplit(prefix, joining_node), PrefixSection(_), PrefixSection(_)) => {
self.handle_section_split(prefix, joining_node, outbox)
}
(OwnSectionMerge(sections),
PrefixSection(sender_prefix),
PrefixSection(merge_prefix)) => {
self.handle_own_section_merge(sender_prefix, merge_prefix, sections, outbox)
}
(OtherSectionMerge(section), PrefixSection(merge_prefix), PrefixSection(_)) => {
self.handle_other_section_merge(merge_prefix, section, outbox)
}
(Ack(ack, _), _, _) => self.handle_ack_response(ack),
(UserMessagePart { hash, part_count, part_index, payload, .. }, src, dst) => {
if let Some(msg) = self.user_msg_cache.add(hash, part_count, part_index, payload) {
self.stats().count_user_message(&msg);
outbox.send_event(msg.into_event(src, dst));
}
Ok(())
}
(content, src, dst) => {
debug!("{:?} Unhandled routing message {:?} from {:?} to {:?}",
self,
content,
src,
dst);
Err(RoutingError::BadAuthority)
}
}
}
fn handle_candidate_approval(&mut self,
candidate_id: PublicId,
client_auth: Authority<XorName>,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
for peer_id in self.peer_mgr.remove_expired_candidates() {
self.disconnect_peer(&peer_id, Some(outbox));
}
let opt_peer_id = match self.peer_mgr.handle_candidate_approval(*candidate_id.name(),
client_auth) {
Ok(peer_id) => peer_id,
Err(_) => {
let src = Authority::ManagedNode(*self.name());
if let Err(error) = self.send_connection_info_request(candidate_id,
src,
client_auth,
outbox) {
debug!("{:?} - Failed to send connection info to {:?}: {:?}",
self,
candidate_id,
error);
}
None
}
};
info!("{:?} Our section with {:?} has approved candidate {}.",
self,
self.our_prefix(),
candidate_id.name());
if self.we_want_to_merge() || self.they_want_to_merge() {
debug!("{:?} Not sending NodeApproval since our section is currently merging.",
self);
} else {
let src = Authority::Section(*candidate_id.name());
let content =
MessageContent::NodeApproval { sections: self.peer_mgr.pub_ids_by_section() };
if let Err(error) = self.send_routing_message(src, client_auth, content) {
debug!("{:?} Failed sending NodeApproval to {}: {:?}",
self,
candidate_id.name(),
error);
}
}
if let Some(peer_id) = opt_peer_id {
self.add_to_routing_table(&candidate_id, &peer_id, outbox);
}
Ok(())
}
fn handle_node_approval(&mut self,
sections: &SectionMap,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
if self.is_approved {
warn!("{:?} Received duplicate NodeApproval.", self);
return Ok(());
}
self.get_approval_timer_token = None;
self.approval_progress_timer_token = None;
if let Err(error) = self.peer_mgr.add_prefixes(sections.keys().cloned().collect()) {
info!("{:?} Received invalid prefixes in NodeApproval: {:?}. Restarting.",
self,
error);
outbox.send_event(Event::RestartRequired);
return Err(error);
}
self.is_approved = true;
outbox.send_event(Event::Connected);
for name in self.peer_mgr.routing_table().iter() {
outbox.send_event(Event::NodeAdded(*name, self.peer_mgr.routing_table().clone()));
}
let our_prefix = *self.our_prefix();
self.send_section_list_signature(our_prefix, None);
for section in sections.values() {
for pub_id in section.iter() {
if !self.peer_mgr.routing_table().has(pub_id.name()) {
self.peer_mgr.expect_peer(pub_id);
debug!("{:?} Sending connection info to {:?} on NodeApproval.",
self,
pub_id);
let src = Authority::ManagedNode(*self.name());
let node_auth = Authority::ManagedNode(*pub_id.name());
if let Err(error) = self.send_connection_info_request(*pub_id,
src,
node_auth,
outbox) {
debug!("{:?} - Failed to send connection info to {:?}: {:?}",
self,
pub_id,
error);
}
}
}
}
info!("{:?} Resource proof challenges completed. This node has been approved to join the \
network!",
self);
trace!("{:?} Node approval completed. Prefixes: {:?}",
self,
self.peer_mgr.routing_table().prefixes());
self.print_rt_size();
self.stats.enable_logging();
let backlog = mem::replace(&mut self.routing_msg_backlog, vec![]);
backlog.into_iter().rev().foreach(|msg| self.msg_queue.push_front(msg));
self.resource_proof_response_parts.clear();
self.reset_rt_timer();
self.candidate_status_token =
Some(self.timer.schedule(Duration::from_secs(CANDIDATE_STATUS_INTERVAL_SECS)));
Ok(())
}
fn handle_resource_proof_request(&mut self,
peer_id: PeerId,
seed: Vec<u8>,
target_size: usize,
difficulty: u8)
-> Result<(), RoutingError> {
if self.resource_proof_response_parts.is_empty() {
info!("{:?} Starting approval process to test this node's resources. This will take \
at least {} seconds.",
self,
RESOURCE_PROOF_DURATION_SECS);
}
let start = Instant::now();
let rp_object = ResourceProof::new(target_size, difficulty);
let mut proof = rp_object.create_proof_data(&seed);
let leading_zero_bytes = rp_object.create_proof(&mut proof);
let elapsed = start.elapsed();
let parts = proof.into_iter()
.chunks(MAX_PART_LEN)
.into_iter()
.map(|chunk| chunk.collect_vec())
.collect_vec();
let part_count = parts.len();
let mut messages = parts.into_iter()
.enumerate()
.rev()
.map(|(part_index, part)| {
DirectMessage::ResourceProofResponse {
part_index: part_index,
part_count: part_count,
proof: part,
leading_zero_bytes: leading_zero_bytes,
}
})
.collect_vec();
let first_message = match messages.pop() {
Some(message) => message,
None => {
DirectMessage::ResourceProofResponse {
part_index: 0,
part_count: 1,
proof: vec![],
leading_zero_bytes: leading_zero_bytes,
}
}
};
let _ = self.resource_proof_response_parts.insert(peer_id, messages);
self.send_direct_message(peer_id, first_message);
trace!("{:?} created proof data in {}. Min section size: {}, Target size: {}, \
Difficulty: {}, Seed: {:?}",
self,
Self::format(elapsed),
self.min_section_size(),
target_size,
difficulty,
seed);
Ok(())
}
fn handle_resource_proof_response_receipt(&mut self, peer_id: PeerId) {
let popped_message =
self.resource_proof_response_parts.get_mut(&peer_id).and_then(Vec::pop);
if let Some(message) = popped_message {
self.send_direct_message(peer_id, message);
}
}
fn handle_resource_proof_response(&mut self,
peer_id: PeerId,
part_index: usize,
part_count: usize,
proof: Vec<u8>,
leading_zero_bytes: u64) {
if self.candidate_timer_token.is_none() {
debug!("{:?} Won't handle resource proof response from {:?} - not currently waiting.",
self,
peer_id);
return;
}
let name = if let Some(name) = self.peer_mgr.get_peer_name(&peer_id) {
*name
} else {
debug!("{:?} Failed to get peer name while handling resource proof response from {:?}",
self,
peer_id);
return;
};
match self.peer_mgr.verify_candidate(&name,
part_index,
part_count,
proof,
leading_zero_bytes) {
Err(error) => {
debug!("{:?} Failed to verify candidate {}: {:?}",
self,
name,
error);
self.candidate_timer_token = None;
}
Ok(None) => {
self.send_direct_message(peer_id, DirectMessage::ResourceProofResponseReceipt);
}
Ok(Some((target_size, difficulty, elapsed))) if difficulty == 0 &&
target_size < 1000 => {
info!("{:?} Candidate {} passed our challenge in {}. Sending approval to our \
section with {:?}.",
self,
name,
Self::format(elapsed),
self.our_prefix());
self.candidate_timer_token = None;
self.send_candidate_approval();
}
Ok(Some((_, _, elapsed))) => {
info!("{:?} Candidate {} passed our challenge in {}. Waiting to send approval to \
our section with {:?}.",
self,
name,
Self::format(elapsed),
self.our_prefix());
}
}
}
fn check_valid_client_message(&self, msg: &RoutingMessage) -> Result<(), RoutingError> {
match msg.content {
MessageContent::Ack(..) => Ok(()),
MessageContent::UserMessagePart { priority, .. } if priority >= DEFAULT_PRIORITY => {
Ok(())
}
_ => {
debug!("{:?} Illegitimate client message {:?}. Refusing to relay.",
self,
msg);
Err(RoutingError::RejectedClientMessage)
}
}
}
fn respond_from_cache(&mut self,
routing_msg: &RoutingMessage,
route: u8)
-> Result<bool, RoutingError> {
if let MessageContent::UserMessagePart { hash,
part_count,
part_index,
cacheable,
ref payload,
.. } = routing_msg.content {
if !cacheable {
return Ok(false);
}
match self.cacheable_user_msg_cache.add(hash, part_count, part_index, payload.clone()) {
Some(UserMessage::Request(request)) => {
if let Some(response) = self.response_cache.get(&request) {
debug!("{:?} Found cached response to {:?}", self, request);
let priority = response.priority();
let src = Authority::ManagedNode(*self.name());
let dst = routing_msg.src;
let msg = UserMessage::Response(response);
self.send_ack_from(routing_msg, route, src);
self.send_user_message(src, dst, msg, priority)?;
return Ok(true);
}
}
Some(UserMessage::Response(response)) => {
debug!("{:?} Putting {:?} in cache", self, response);
self.response_cache.put(response);
}
None => (),
}
}
Ok(false)
}
fn relocate(&mut self) -> Result<(), RoutingError> {
let duration = Duration::from_secs(GET_NODE_NAME_TIMEOUT_SECS);
self.get_approval_timer_token = Some(self.timer.schedule(duration));
let request_content = MessageContent::GetNodeName {
current_id: *self.full_id.public_id(),
message_id: MessageId::new(),
};
let proxy_name = if let Some((_, proxy_pub_id)) = self.peer_mgr.proxy() {
*proxy_pub_id.name()
} else {
return Err(RoutingError::ProxyConnectionNotFound);
};
let src = Authority::Client {
client_key: *self.full_id.public_id().signing_public_key(),
proxy_node_name: proxy_name,
peer_id: self.crust_service.id(),
};
let dst = Authority::Section(*self.name());
info!("{:?} Requesting a relocated name from the network. This can take a while.",
self);
self.send_routing_message(src, dst, request_content)
}
fn send_bootstrap_identify(&mut self, peer_id: PeerId) {
let direct_message =
DirectMessage::BootstrapIdentify { public_id: *self.full_id.public_id() };
self.send_direct_message(peer_id, direct_message);
}
fn handle_client_identify(&mut self,
public_id: PublicId,
peer_id: PeerId,
client_restriction: bool,
outbox: &mut EventBox) {
if !client_restriction && !self.crust_service.is_peer_whitelisted(&peer_id) {
warn!("{:?} Client is not whitelisted, so dropping connection.",
self);
self.disconnect_peer(&peer_id, Some(outbox));
return;
}
if *public_id.name() != XorName(sha256::hash(&public_id.signing_public_key().0).0) {
warn!("{:?} Incoming connection not validated as a proper client, so dropping it.",
self);
self.disconnect_peer(&peer_id, Some(outbox));
return;
}
for peer_id in self.peer_mgr.remove_expired_joining_nodes() {
debug!("{:?} Removing stale joining node with peer ID {:?}",
self,
peer_id);
self.disconnect_peer(&peer_id, Some(outbox));
}
if !self.is_approved {
debug!("{:?} Client {:?} rejected: We are not approved as a node yet.",
self,
public_id.name());
self.send_direct_message(peer_id, DirectMessage::BootstrapDeny);
return;
}
if (client_restriction || !self.is_first_node) &&
self.peer_mgr.routing_table().len() < self.min_section_size() - 1 {
debug!("{:?} Client {:?} rejected: Routing table has {} entries. {} required.",
self,
public_id.name(),
self.peer_mgr.routing_table().len(),
self.min_section_size() - 1);
self.send_direct_message(peer_id, DirectMessage::BootstrapDeny);
return;
}
let non_unique = if client_restriction {
self.peer_mgr.insert_client(peer_id, public_id)
} else {
self.peer_mgr.insert_joining_node(peer_id, public_id)
};
if non_unique {
debug!("{:?} Received two ClientInfo messages from the same peer ID {:?}.",
self,
peer_id);
}
debug!("{:?} Accepted client {:?}.", self, public_id.name());
self.send_bootstrap_identify(peer_id)
}
fn handle_candidate_identify(&mut self,
public_id: PublicId,
peer_id: PeerId,
outbox: &mut EventBox) {
let name = public_id.name();
debug!("{:?} Handling CandidateIdentify from {:?}.", self, name);
let (difficulty, target_size) = if self.crust_service.is_peer_hard_coded(&peer_id) ||
self.peer_mgr.get_joining_node(&peer_id).is_some() {
(0, 1)
} else {
(RESOURCE_PROOF_DIFFICULTY,
RESOURCE_PROOF_TARGET_SIZE /
(self.peer_mgr
.routing_table()
.our_section()
.len() + 1))
};
let seed: Vec<u8> = if cfg!(feature = "use-mock-crust") {
vec![5u8; 4]
} else {
rand::thread_rng().gen_iter().take(10).collect()
};
match self.peer_mgr.handle_candidate_identify(&public_id,
&peer_id,
target_size,
difficulty,
seed.clone()) {
Ok(true) => {
let direct_message = DirectMessage::ResourceProof {
seed: seed,
target_size: target_size,
difficulty: difficulty,
};
info!("{:?} Sending resource proof challenge to candidate {}",
self,
public_id.name());
self.send_direct_message(peer_id, direct_message);
}
Ok(false) => {
info!("{:?} Adding candidate {} to routing table without sending resource proof \
challenge as section has already approved it.",
self,
public_id.name());
self.add_to_routing_table(&public_id, &peer_id, outbox);
}
Err(RoutingError::CandidateIsTunnelling) => {
debug!("{:?} handling a tunnelling candidate {:?}", self, name);
}
Err(error) => {
debug!("{:?} failed to handle CandidateIdentify from {:?}: {:?} - disconnecting",
self,
name,
error);
self.disconnect_peer(&peer_id, Some(outbox));
}
}
}
fn add_to_routing_table(&mut self,
public_id: &PublicId,
peer_id: &PeerId,
outbox: &mut EventBox) {
let want_to_merge = self.we_want_to_merge() || self.they_want_to_merge();
let mut need_split = false;
match self.peer_mgr.add_to_routing_table(public_id, peer_id, want_to_merge) {
Err(RoutingTableError::AlreadyExists) => return, Err(error) => {
debug!("{:?} Peer {:?} was not added to the routing table: {}",
self,
peer_id,
error);
self.disconnect_peer(peer_id, Some(outbox));
return;
}
Ok(true) => {
let our_prefix = *self.our_prefix();
if our_prefix.matches(public_id.name()) {
need_split = true;
self.send_section_split(our_prefix, *public_id.name());
}
}
Ok(false) => {
self.merge_if_necessary();
}
}
if self.peer_mgr
.routing_table()
.our_section()
.contains(public_id.name()) {
self.reset_rt_timer();
}
debug!("{:?} Added {:?} to routing table.", self, public_id.name());
if self.is_first_node && self.peer_mgr.routing_table().len() == 1 {
trace!("{:?} Node approval completed. Prefixes: {:?}",
self,
self.peer_mgr.routing_table().prefixes());
outbox.send_event(Event::Connected);
}
if self.is_approved {
outbox.send_event(Event::NodeAdded(*public_id.name(),
self.peer_mgr.routing_table().clone()));
if !need_split && self.our_prefix().matches(public_id.name()) {
self.send_section_update(None);
}
if let Some(prefix) = self.peer_mgr
.routing_table()
.find_section_prefix(public_id.name()) {
self.send_section_list_signature(prefix, None);
if prefix == *self.our_prefix() {
for pfx in self.peer_mgr.routing_table().prefixes() {
self.send_section_list_signature(pfx, Some(*public_id.name()));
}
}
}
}
for (dst_id, peer_name) in self.peer_mgr.peers_needing_tunnel() {
if self.peer_mgr.is_potential_tunnel_node(public_id.name(), &peer_name) {
trace!("{:?} Asking {:?} to serve as a tunnel for {:?}",
self,
peer_id,
dst_id);
let tunnel_request = DirectMessage::TunnelRequest(dst_id);
self.send_direct_message(*peer_id, tunnel_request);
}
}
}
fn send_section_update(&mut self, dst_prefix: Option<Prefix<XorName>>) {
if !self.peer_mgr.routing_table().is_valid() {
warn!("{:?} Not sending section update since RT invariant not held.",
self);
return;
} else if self.they_want_to_merge() || self.we_want_to_merge() {
trace!("{:?} Not sending section update since we are in the process of merging.",
self);
return;
}
let members = self.peer_mgr.get_pub_ids(self.peer_mgr.routing_table().our_section());
let content = MessageContent::SectionUpdate {
prefix: *self.our_prefix(),
members: members,
merge: dst_prefix.is_some(),
};
let neighbours = match dst_prefix {
Some(prefix) => iter::once(prefix).collect(),
None => self.peer_mgr.routing_table().other_prefixes(),
};
trace!("{:?} Sending section update to {:?}", self, neighbours);
for neighbour_pfx in neighbours {
let src = Authority::Section(self.our_prefix().lower_bound());
let dst = Authority::PrefixSection(neighbour_pfx);
if let Err(err) = self.send_routing_message(src, dst, content.clone()) {
debug!("{:?} Failed to send section update to {:?}: {:?}",
self,
neighbour_pfx,
err);
}
}
}
fn send_connection_info(&mut self,
our_pub_info: PubConnectionInfo,
their_pub_id: PublicId,
src: Authority<XorName>,
dst: Authority<XorName>,
msg_id: Option<MessageId>) {
let encoded_connection_info = match serialisation::serialise(&our_pub_info) {
Ok(encoded_connection_info) => encoded_connection_info,
Err(err) => {
debug!("{:?} Failed to serialise connection info for {:?}: {:?}.",
self,
their_pub_id.name(),
err);
return;
}
};
let nonce = box_::gen_nonce();
let encrypted_conn_info = box_::seal(&encoded_connection_info,
&nonce,
their_pub_id.encrypting_public_key(),
self.full_id().encrypting_private_key());
let msg_content = if let Some(msg_id) = msg_id {
MessageContent::ConnectionInfoResponse {
encrypted_conn_info: encrypted_conn_info,
nonce: nonce.0,
pub_id: *self.full_id().public_id(),
msg_id: msg_id,
}
} else {
MessageContent::ConnectionInfoRequest {
encrypted_conn_info: encrypted_conn_info,
nonce: nonce.0,
pub_id: *self.full_id().public_id(),
msg_id: MessageId::new(),
}
};
if let Err(err) = self.send_routing_message(src, dst, msg_content) {
debug!("{:?} Failed to send connection info for {:?}: {:?}.",
self,
their_pub_id.name(),
err);
}
}
fn handle_connection_info_prepared(&mut self,
result_token: u32,
result: Result<PrivConnectionInfo, CrustError>) {
let our_connection_info = match result {
Err(err) => {
error!("{:?} Failed to prepare connection info: {:?}. Retrying.",
self,
err);
let new_token = match self.peer_mgr.get_new_connection_info_token(result_token) {
Err(error) => {
debug!("{:?} Failed to prepare connection info, but no entry found in \
token map: {:?}",
self,
error);
return;
}
Ok(new_token) => new_token,
};
self.crust_service.prepare_connection_info(new_token);
return;
}
Ok(connection_info) => connection_info,
};
let our_pub_info = our_connection_info.to_pub_connection_info();
match self.peer_mgr.connection_info_prepared(result_token, our_connection_info) {
Err(error) => {
debug!("{:?} Prepared connection info, but no entry found in token map: {:?}",
self,
error);
return;
}
Ok(ConnectionInfoPreparedResult { pub_id, src, dst, infos }) => {
match infos {
None => {
debug!("{:?} Prepared connection info for {:?}.",
self,
pub_id.name());
self.send_connection_info(our_pub_info, pub_id, src, dst, None);
}
Some((our_info, their_info, msg_id)) => {
debug!("{:?} Trying to connect to {:?} as {:?}.",
self,
their_info.id(),
pub_id.name());
self.send_connection_info(our_pub_info, pub_id, src, dst, Some(msg_id));
let _ = self.crust_service.connect(our_info, their_info);
}
}
}
}
}
#[cfg_attr(feature="cargo-clippy", allow(too_many_arguments))]
fn handle_connection_info_request(&mut self,
encrypted_connection_info: Vec<u8>,
nonce_bytes: [u8; box_::NONCEBYTES],
public_id: PublicId,
message_id: MessageId,
src: Authority<XorName>,
dst: Authority<XorName>,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
let name = match src {
Authority::Client { .. } => public_id.name(),
Authority::ManagedNode(ref name) => name,
_ => unreachable!(),
};
self.peer_mgr.allow_connect(name)?;
let their_connection_info =
self.decrypt_connection_info(&encrypted_connection_info,
&box_::Nonce(nonce_bytes),
&public_id)?;
let peer_id = their_connection_info.id();
use peer_manager::ConnectionInfoReceivedResult::*;
match self.peer_mgr.connection_info_received(src,
dst,
public_id,
their_connection_info,
message_id) {
Ok(Ready(our_info, their_info)) => {
debug!("{:?} Already sent a connection info request to {:?} ({:?}); resending \
our same details as a response.",
self,
public_id.name(),
peer_id);
self.send_connection_info(our_info.to_pub_connection_info(),
public_id,
dst,
src,
Some(message_id));
if let Err(error) = self.crust_service.connect(our_info, their_info) {
trace!("{:?} Unable to connect to {:?} - {:?}", self, src, error);
}
}
Ok(Prepare(token)) => {
self.crust_service.prepare_connection_info(token);
}
Ok(IsProxy) |
Ok(IsClient) |
Ok(IsJoiningNode) => {
self.send_node_identify(peer_id);
self.add_to_routing_table(&public_id, &peer_id, outbox);
}
Ok(Waiting) | Ok(IsConnected) | Err(_) => (),
}
Ok(())
}
fn handle_connection_info_response(&mut self,
encrypted_connection_info: Vec<u8>,
nonce_bytes: [u8; box_::NONCEBYTES],
public_id: PublicId,
message_id: MessageId,
src: XorName,
dst: Authority<XorName>)
-> Result<(), RoutingError> {
self.peer_mgr.allow_connect(&src)?;
let their_connection_info =
self.decrypt_connection_info(&encrypted_connection_info,
&box_::Nonce(nonce_bytes),
&public_id)?;
let peer_id = their_connection_info.id();
use peer_manager::ConnectionInfoReceivedResult::*;
match self.peer_mgr.connection_info_received(Authority::ManagedNode(src),
dst,
public_id,
their_connection_info,
message_id) {
Ok(Ready(our_info, their_info)) => {
trace!("{:?} Received connection info response. Trying to connect to {:?} ({:?}).",
self,
public_id.name(),
peer_id);
if let Err(error) = self.crust_service.connect(our_info, their_info) {
debug!("{:?} Crust failed initiating a connection to {:?} ({:?}): {:?}",
self,
public_id.name(),
peer_id,
error);
}
}
Ok(Prepare(_)) |
Ok(IsProxy) |
Ok(IsClient) |
Ok(IsJoiningNode) => {
debug!("{:?} Received connection info response from {:?} ({:?}) when we haven't \
sent a corresponding request",
self,
public_id.name(),
peer_id);
}
Ok(Waiting) | Ok(IsConnected) | Err(_) => (),
}
Ok(())
}
fn handle_tunnel_request(&mut self, peer_id: PeerId, dst_id: PeerId) {
if self.peer_mgr.can_tunnel_for(&peer_id, &dst_id) {
if let Some((id0, id1)) = self.tunnels.consider_clients(peer_id, dst_id) {
debug!("{:?} Accepted tunnel request from {:?} for {:?}.",
self,
peer_id,
dst_id);
self.send_direct_message(id0, DirectMessage::TunnelSuccess(id1));
}
} else {
debug!("{:?} Rejected tunnel request from {:?} for {:?}.",
self,
peer_id,
dst_id);
}
}
fn handle_tunnel_success(&mut self, peer_id: PeerId, dst_id: PeerId) {
if !self.peer_mgr.tunnelling_to(&dst_id) {
debug!("{:?} Received TunnelSuccess from {:?} for an already connected peer {:?}",
self,
peer_id,
dst_id);
let message = DirectMessage::TunnelDisconnect(dst_id);
self.send_direct_message(peer_id, message);
}
let can_tunnel_for = |peer: &Peer| peer.state().can_tunnel_for();
if self.peer_mgr.get_connected_peer(&peer_id).map_or(false, can_tunnel_for) &&
self.tunnels.add(dst_id, peer_id) {
debug!("{:?} Adding {:?} as a tunnel node for {:?}.",
self,
peer_id,
dst_id);
self.send_node_identify(dst_id);
}
}
fn handle_tunnel_closed(&mut self, peer_id: PeerId, dst_id: PeerId, outbox: &mut EventBox) {
if self.tunnels.remove(dst_id, peer_id) {
debug!("{:?} Tunnel to {:?} via {:?} closed.",
self,
dst_id,
peer_id);
if !self.crust_service.is_connected(&dst_id) {
self.dropped_peer(&dst_id, outbox, true);
}
}
}
fn handle_tunnel_disconnect(&mut self, peer_id: PeerId, dst_id: PeerId) {
debug!("{:?} Closing tunnel connecting {:?} and {:?}.",
self,
dst_id,
peer_id);
if self.tunnels.drop_client_pair(dst_id, peer_id) {
self.send_direct_message(dst_id, DirectMessage::TunnelClosed(peer_id));
}
}
fn disconnect_peer(&mut self, peer_id: &PeerId, outbox: Option<&mut EventBox>) {
if let Some(&pub_id) = self.peer_mgr.get_routing_peer(peer_id) {
debug!("{:?} Not disconnecting routing table entry {:?} ({:?}).",
self,
pub_id.name(),
peer_id);
} else if let Some(&public_id) = self.peer_mgr.get_proxy_public_id(peer_id) {
debug!("{:?} Not disconnecting proxy node {:?} ({:?}).",
self,
public_id.name(),
peer_id);
} else if self.peer_mgr.get_client(peer_id).is_some() {
debug!("{:?} Not disconnecting client {:?}.", self, peer_id);
} else if self.peer_mgr.get_joining_node(peer_id).is_some() {
debug!("{:?} Not disconnecting joining node {:?}.", self, peer_id);
} else if let Some(tunnel_id) = self.tunnels.remove_tunnel_for(peer_id) {
debug!("{:?} Disconnecting {:?} (indirect).", self, peer_id);
let message = DirectMessage::TunnelDisconnect(*peer_id);
self.send_direct_message(tunnel_id, message);
let _ = self.peer_mgr.remove_peer(peer_id);
} else {
debug!("{:?} Disconnecting {:?}. Calling crust::Service::disconnect.",
self,
peer_id);
let _ = self.crust_service.disconnect(*peer_id);
let _ = self.peer_mgr.remove_peer(peer_id);
self.dropped_tunnel_client(peer_id);
match outbox {
Some(event_box) => self.dropped_tunnel_node(peer_id, event_box),
None => {
if self.tunnels.is_tunnel_node(peer_id) {
debug!("{:?} Disconnected from {:?} which was acting as tunnel node. \
Some uncontactable peers will remain until RT purge next runs.",
self,
peer_id);
}
}
}
}
}
fn handle_get_node_name_request(&mut self,
mut their_public_id: PublicId,
client_key: sign::PublicKey,
proxy_name: XorName,
dst_name: XorName,
peer_id: PeerId,
message_id: MessageId)
-> Result<(), RoutingError> {
let hashed_key = sha256::hash(&client_key.0);
let section_matching_client_name = XorName(hashed_key.0);
if section_matching_client_name != dst_name {
return Err(RoutingError::InvalidDestination);
}
let close_section = match self.peer_mgr.routing_table().close_names(&dst_name) {
Some(close_section) => close_section.into_iter().collect(),
None => return Err(RoutingError::InvalidDestination),
};
let relocated_name = self.next_node_name.unwrap_or_else(|| {
utils::calculate_relocated_name(close_section, their_public_id.name())
});
their_public_id.set_name(relocated_name);
let request_content = MessageContent::ExpectCandidate {
expect_id: their_public_id,
client_auth: Authority::Client {
client_key: client_key,
proxy_node_name: proxy_name,
peer_id: peer_id,
},
message_id: message_id,
};
let src = Authority::Section(dst_name);
let dst = Authority::Section(relocated_name);
self.send_routing_message(src, dst, request_content)
}
fn handle_get_node_name_response(&mut self,
relocated_id: PublicId,
section: BTreeSet<PublicId>,
dst: Authority<XorName>,
outbox: &mut EventBox) {
if !self.full_id.public_id().is_client_id() {
warn!("{:?} Received duplicate GetNodeName response.", self);
return;
}
let duration = Duration::from_secs(APPROVAL_TIMEOUT_SECS);
self.approval_expiry_time = Instant::now() + duration;
self.get_approval_timer_token = Some(self.timer.schedule(duration));
self.approval_progress_timer_token =
Some(self.timer.schedule(Duration::from_secs(APPROVAL_PROGRESS_INTERVAL_SECS)));
self.full_id.public_id_mut().set_name(*relocated_id.name());
self.peer_mgr.reset_routing_table(*self.full_id.public_id());
self.challenger_count = section.len();
if let Some((_, proxy_public_id)) = self.peer_mgr.proxy() {
if section.contains(proxy_public_id) {
self.proxy_is_resource_proof_challenger = true;
self.challenger_count -= 1;
}
}
trace!("{:?} GetNodeName completed. Prefixes: {:?}",
self,
self.peer_mgr.routing_table().prefixes());
info!("{:?} Received relocated name. Establishing connections to {} peers.",
self,
section.len());
for pub_id in §ion {
debug!("{:?} Sending connection info to {:?} on GetNodeName response.",
self,
pub_id);
let node_auth = Authority::ManagedNode(*pub_id.name());
if let Err(error) = self.send_connection_info_request(*pub_id, dst, node_auth, outbox) {
debug!("{:?} - Failed to send connection info to {:?}: {:?}",
self,
pub_id,
error);
}
}
}
fn handle_expect_candidate(&mut self,
mut candidate_id: PublicId,
client_auth: Authority<XorName>,
message_id: MessageId,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
for peer_id in self.peer_mgr.remove_expired_candidates() {
self.disconnect_peer(&peer_id, Some(outbox));
}
if candidate_id.signing_public_key() == self.full_id.public_id().signing_public_key() {
return Ok(()); }
let original_name = *candidate_id.name();
let relocated_name = self.next_node_name.take().unwrap_or_else(|| {
self.peer_mgr.routing_table().assign_to_min_len_prefix(&original_name)
});
candidate_id.set_name(relocated_name);
if self.peer_mgr
.routing_table()
.should_join_our_section(candidate_id.name())
.is_err() {
let request_content = MessageContent::ExpectCandidate {
expect_id: candidate_id,
client_auth: client_auth,
message_id: message_id,
};
let src = Authority::Section(original_name);
let dst = Authority::Section(*candidate_id.name());
return self.send_routing_message(src, dst, request_content);
}
self.peer_mgr.expect_candidate(*candidate_id.name(), client_auth)?;
let response_content = MessageContent::AcceptAsCandidate {
expect_id: candidate_id,
client_auth: client_auth,
message_id: message_id,
};
info!("{:?} Expecting candidate {} via {:?}.",
self,
candidate_id.name(),
client_auth);
let src = Authority::Section(*candidate_id.name());
self.send_routing_message(src, src, response_content)
}
fn handle_accept_as_candidate(&mut self,
candidate_id: PublicId,
client_auth: Authority<XorName>,
message_id: MessageId,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
for peer_id in self.peer_mgr.remove_expired_candidates() {
self.disconnect_peer(&peer_id, Some(outbox));
}
if candidate_id == *self.full_id.public_id() {
return Ok(());
}
self.candidate_timer_token =
Some(self.timer.schedule(Duration::from_secs(RESOURCE_PROOF_DURATION_SECS)));
let own_section = self.peer_mgr.accept_as_candidate(*candidate_id.name(), client_auth);
let response_content = MessageContent::GetNodeNameResponse {
relocated_id: candidate_id,
section: own_section,
message_id: message_id,
};
info!("{:?} Our section with {:?} accepted {} as a candidate.",
self,
self.our_prefix(),
candidate_id.name());
trace!("{:?} Sending {:?} to {:?}",
self,
response_content,
client_auth);
let src = Authority::Section(*candidate_id.name());
self.send_routing_message(src, client_auth, response_content)
}
fn handle_section_update_request(&mut self, prefix: Prefix<XorName>) {
trace!("{:?} Got section update request from {:?}", self, prefix);
self.send_section_update(Some(prefix));
}
fn handle_section_update(&mut self,
prefix: Prefix<XorName>,
members: BTreeSet<PublicId>,
merge: bool,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
trace!("{:?} Got section update for {:?}", self, prefix);
let pfx_name = prefix.lower_bound();
while let Some(rt_pfx) = self.peer_mgr.routing_table().find_section_prefix(&pfx_name) {
if merge && rt_pfx.bit_count() > prefix.bit_count() {
for (name, peer_id) in self.peer_mgr.add_prefix(prefix) {
self.disconnect_peer(&peer_id, Some(outbox));
info!("{:?} Dropped {:?} from the routing table.", self, name);
}
info!("{:?} Merge on SectionUpdate completed. Prefixes: {:?}",
self,
self.peer_mgr.routing_table().prefixes());
}
if rt_pfx.bit_count() >= prefix.bit_count() {
break;
}
debug!("{:?} Splitting {:?} on section update.", self, rt_pfx);
let _ = self.handle_section_split(rt_pfx, rt_pfx.lower_bound(), outbox);
}
let members =
if let Some(section) = self.peer_mgr.routing_table().section_with_prefix(&prefix) {
let f = |id: &PublicId| !section.contains(id.name());
members.into_iter().filter(f).collect_vec()
} else {
debug!("{:?} Section update received from unknown neighbour {:?}",
self,
prefix);
return Ok(());
};
let members = members.into_iter()
.filter(|id: &PublicId| !self.peer_mgr.is_expected(id.name()))
.collect_vec();
let own_name = *self.name();
for pub_id in members {
self.peer_mgr.expect_peer(&pub_id);
if let Err(error) =
self.send_connection_info_request(pub_id,
Authority::ManagedNode(own_name),
Authority::ManagedNode(*pub_id.name()),
outbox) {
debug!("{:?} - Failed to send connection info to {:?}: {:?}",
self,
pub_id,
error);
}
}
self.cache_section_update_request(prefix);
Ok(())
}
fn handle_rt_req(&mut self,
msg_id: MessageId,
digest: sha256::Digest,
src: Authority<XorName>,
dst: Authority<XorName>)
-> Result<(), RoutingError> {
if self.we_want_to_merge() || self.they_want_to_merge() {
return Ok(());
}
let sections = self.peer_mgr.pub_ids_by_section();
let prefixes = self.peer_mgr.routing_table().prefixes();
let serialised_rt = serialisation::serialise(&(§ions, prefixes))?;
if digest == sha256::hash(&serialised_rt) {
return Ok(());
}
for (prefix, members) in sections {
let content = MessageContent::RoutingTableResponse {
message_id: msg_id,
prefix: prefix,
members: members,
};
if let Err(err) = self.send_routing_message(dst, src, content) {
debug!("{:?} Failed to send RoutingTableResponse: {:?}.", self, err);
}
}
Ok(())
}
fn handle_rt_rsp(&mut self,
prefix: Prefix<XorName>,
members: BTreeSet<PublicId>,
message_id: MessageId,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
if Some(message_id) != self.rt_msg_id || self.we_want_to_merge() ||
self.they_want_to_merge() {
trace!("{:?} Ignoring RT response {:?}. Waiting for {:?}",
self,
message_id,
self.rt_msg_id);
return Ok(());
}
let old_prefix = *self.our_prefix();
for (name, peer_id) in self.peer_mgr.add_prefix(prefix) {
self.disconnect_peer(&peer_id, Some(outbox));
info!("{:?} Dropped {:?} from the routing table.", self, name);
}
let new_prefix = *self.our_prefix();
if old_prefix.bit_count() < new_prefix.bit_count() {
trace!("{:?} Found out about our section splitting via RT response {:?}",
self,
message_id);
outbox.send_event(Event::SectionSplit(new_prefix));
} else if old_prefix.bit_count() > new_prefix.bit_count() {
trace!("{:?} Found out about our section merging via RT response {:?}",
self,
message_id);
outbox.send_event(Event::SectionMerge(new_prefix));
}
info!("{:?} Update on RoutingTableResponse completed. Prefixes: {:?}",
self,
self.peer_mgr.routing_table().prefixes());
let src = Authority::ManagedNode(*self.name());
for member in members {
if self.peer_mgr
.routing_table()
.need_to_add(member.name())
.is_ok() {
let dst = Authority::ManagedNode(*member.name());
if let Err(error) = self.send_connection_info_request(member, src, dst, outbox) {
debug!("{:?} - Failed to send connection info to {:?}: {:?}",
self,
member,
error);
}
}
}
Ok(())
}
fn handle_section_split(&mut self,
prefix: Prefix<XorName>,
joining_node: XorName,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
let split_us = prefix == *self.our_prefix();
if split_us && !self.peer_mgr.routing_table().has(&joining_node) {
self.send_section_split(prefix, joining_node);
}
let (peers_to_drop, our_new_prefix) = self.peer_mgr.split_section(prefix);
if let Some(new_prefix) = our_new_prefix {
outbox.send_event(Event::SectionSplit(new_prefix));
}
for (_name, peer_id) in peers_to_drop {
self.disconnect_peer(&peer_id, Some(outbox));
}
info!("{:?} Section split for {:?} completed. Prefixes: {:?}",
self,
prefix,
self.peer_mgr.routing_table().prefixes());
self.merge_if_necessary();
if split_us {
self.send_section_update(None);
} else {
self.cache_section_update_request(prefix);
}
let prefix0 = prefix.pushed(false);
let prefix1 = prefix.pushed(true);
self.send_section_list_signature(prefix0, None);
self.send_section_list_signature(prefix1, None);
self.reset_rt_timer();
Ok(())
}
fn handle_own_section_merge(&mut self,
sender_prefix: Prefix<XorName>,
merge_prefix: Prefix<XorName>,
sections: SectionMap,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
if !merge_prefix.is_compatible(&sender_prefix) ||
merge_prefix.bit_count() + 1 != sender_prefix.bit_count() ||
!merge_prefix.is_compatible(self.our_prefix()) ||
merge_prefix.bit_count() >= self.our_prefix().bit_count() {
debug!("{:?} Received OwnSectionMerge with merge prefix {:?} from prefix {:?}.",
self,
merge_prefix,
sender_prefix);
return Err(RoutingError::BadAuthority);
}
if let Some(previous_sections) = self.merge_cache.insert(sender_prefix, sections) {
debug!("{:?} Received duplicate OwnSectionMerge from {:?}: {:?}.",
self,
sender_prefix,
previous_sections);
}
loop {
let our_prefix = *self.our_prefix();
let our_sections = match self.merge_cache.remove(&our_prefix) {
None => break,
Some(our_sections) => our_sections,
};
let their_prefix = our_prefix.sibling();
let their_sections = match self.merge_cache.remove(&their_prefix) {
None => {
let _none = self.merge_cache.insert(our_prefix, our_sections);
break;
}
Some(their_sections) => their_sections,
};
self.process_own_section_merge(our_prefix, our_sections, outbox)?;
self.process_own_section_merge(their_prefix, their_sections, outbox)?;
}
self.merge_if_necessary();
Ok(())
}
fn we_want_to_merge(&self) -> bool {
self.merge_cache.contains_key(self.our_prefix())
}
fn they_want_to_merge(&self) -> bool {
self.merge_cache.contains_key(&self.our_prefix().sibling())
}
fn process_own_section_merge(&mut self,
sender_prefix: Prefix<XorName>,
sections: SectionMap,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
let merge_prefix = sender_prefix.popped();
let (merge_state, needed_peers) =
self.peer_mgr.merge_own_section(sender_prefix, merge_prefix, sections);
match merge_state {
OwnMergeState::Ongoing |
OwnMergeState::AlreadyMerged => (),
OwnMergeState::Completed { targets, merge_details } => {
outbox.send_event(Event::SectionMerge(merge_details.prefix));
info!("{:?} Own section merge completed. Prefixes: {:?}",
self,
self.peer_mgr.routing_table().prefixes());
for prefix in self.peer_mgr.routing_table().prefixes() {
self.send_section_list_signature(prefix, None);
}
self.send_other_section_merge(targets, merge_details);
let own_name = *self.name();
for needed in &needed_peers {
debug!("{:?} Sending connection info to {:?} due to merging own section.",
self,
needed);
if let Err(error) =
self.send_connection_info_request(*needed,
Authority::ManagedNode(own_name),
Authority::ManagedNode(*needed.name()),
outbox) {
debug!("{:?} - Failed to send connection info to {:?}: {:?}",
self,
needed,
error);
}
}
let cached_section_update_requests =
mem::replace(&mut self.cached_section_update_requests, BTreeSet::new());
for prefix in cached_section_update_requests {
let src = Authority::ManagedNode(*self.name());
let dst = Authority::PrefixSection(prefix);
let content =
MessageContent::SectionUpdateRequest { our_prefix: *self.our_prefix() };
if let Err(err) = self.send_routing_message(src, dst, content) {
debug!("{:?} Failed to send SectionUpdateRequest: {:?}.", self, err);
}
}
}
}
self.reset_rt_timer();
Ok(())
}
fn handle_other_section_merge(&mut self,
merge_prefix: Prefix<XorName>,
section: BTreeSet<PublicId>,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
let needed_peers = self.peer_mgr.merge_other_section(merge_prefix, section);
let own_name = *self.name();
for needed in needed_peers {
debug!("{:?} Sending connection info to {:?} due to merging other section.",
self,
needed);
let needed_name = *needed.name();
if let Err(error) =
self.send_connection_info_request(needed,
Authority::ManagedNode(own_name),
Authority::ManagedNode(needed_name),
outbox) {
debug!("{:?} - Failed to send connection info: {:?}", self, error);
}
}
info!("{:?} Other section merge completed. Prefixes: {:?}",
self,
self.peer_mgr.routing_table().prefixes());
self.merge_if_necessary();
self.send_section_list_signature(merge_prefix, None);
self.reset_rt_timer();
self.cache_section_update_request(merge_prefix);
Ok(())
}
fn handle_ack_response(&mut self, ack: Ack) -> Result<(), RoutingError> {
self.ack_mgr.receive(ack);
Ok(())
}
fn handle_timeout(&mut self, token: u64, outbox: &mut EventBox) -> Transition {
if self.get_approval_timer_token == Some(token) {
self.handle_approval_timeout(outbox);
return Transition::Terminate;
}
if self.tick_timer_token == token {
let tick_period = Duration::from_secs(TICK_TIMEOUT_SECS);
self.tick_timer_token = self.timer.schedule(tick_period);
for peer_id in self.peer_mgr.remove_expired_connections() {
debug!("{:?} Disconnecting from timed out peer {:?}", self, peer_id);
let _ = self.crust_service.disconnect(peer_id);
}
let transition = self.purge_invalid_rt_entries(outbox);
self.merge_if_necessary();
outbox.send_event(Event::Tick);
return transition;
}
if self.rt_timer_token == Some(token) {
self.rt_timeout = cmp::min(Duration::from_secs(RT_MAX_TIMEOUT_SECS),
self.rt_timeout * 2);
trace!("{:?} Scheduling next RT request for {} seconds from now.",
self,
self.rt_timeout.as_secs());
self.rt_timer_token = Some(self.timer.schedule(self.rt_timeout));
self.send_rt_request();
} else if self.candidate_timer_token == Some(token) {
self.candidate_timer_token = None;
self.send_candidate_approval();
} else if self.candidate_status_token == Some(token) {
self.candidate_status_token =
Some(self.timer.schedule(Duration::from_secs(CANDIDATE_STATUS_INTERVAL_SECS)));
self.peer_mgr.show_candidate_status();
} else if self.approval_progress_timer_token == Some(token) {
self.approval_progress_timer_token =
Some(self.timer.schedule(Duration::from_secs(APPROVAL_PROGRESS_INTERVAL_SECS)));
let now = Instant::now();
let remaining_duration = if now < self.approval_expiry_time {
let duration = self.approval_expiry_time - now;
if duration.subsec_nanos() >= 500_000_000 {
duration.as_secs() + 1
} else {
duration.as_secs()
}
} else {
0
};
info!("{:?} {} {}/{} seconds remaining.",
self,
self.resource_proof_response_progress(),
remaining_duration,
APPROVAL_TIMEOUT_SECS);
} else {
self.resend_unacknowledged_timed_out_msgs(token);
}
Transition::Stay
}
fn handle_approval_timeout(&mut self, outbox: &mut EventBox) {
if self.resource_proof_response_parts.is_empty() {
info!("{:?} Failed to get relocated name from the network, so restarting.",
self);
outbox.send_event(Event::RestartRequired);
} else {
let completed = self.resource_proof_response_parts
.values()
.filter(|parts| parts.is_empty())
.count();
if completed == self.challenger_count {
info!("{:?} All {} resource proof responses fully sent, but timed out waiting \
for approval from the network. This could be due to the target section \
experiencing churn. Terminating node.",
self,
completed);
} else {
info!("{:?} Failed to get approval from the network. {} Terminating node.",
self,
self.resource_proof_response_progress());
}
outbox.send_event(Event::Terminate);
}
}
fn purge_invalid_rt_entries(&mut self, outbox: &mut EventBox) -> Transition {
let mut peer_ids_to_drop = vec![];
for (peer_id, name, is_tunnel) in self.peer_mgr.get_routing_peer_details() {
if is_tunnel {
match self.tunnels.tunnel_for(&peer_id) {
Some(tunnel_node_id) => {
if !self.crust_service.is_connected(tunnel_node_id) {
debug!("{:?} Should have a tunnel connection to {} {:?} via {:?}, \
but tunnel node not connected.",
self,
name,
peer_id,
tunnel_node_id);
peer_ids_to_drop.push(*tunnel_node_id);
}
}
None => {
if self.crust_service.is_connected(&peer_id) {
debug!("{:?} Should have a tunnel connection to {} {:?}, but instead \
have a direct connection.",
self,
name,
peer_id);
self.peer_mgr.correct_routing_state_to_direct(&peer_id);
} else {
debug!("{:?} Should have a tunnel connection to {} {:?}, but no \
tunnel node or direct connection exists.",
self,
name,
peer_id);
peer_ids_to_drop.push(peer_id);
}
}
}
} else if !self.crust_service.is_connected(&peer_id) {
error!("{:?} Should have a direct connection to {} {:?}, but don't.",
self,
name,
peer_id);
peer_ids_to_drop.push(peer_id);
}
}
let mut transition = Transition::Stay;
for peer_id in peer_ids_to_drop {
debug!("{:?} Purging {:?} from routing table.", self, peer_id);
if let Transition::Terminate = self.handle_lost_peer(peer_id, outbox) {
transition = Transition::Terminate;
}
}
transition
}
fn send_rt_request(&mut self) {
if self.is_approved {
let msg_id = MessageId::new();
self.rt_msg_id = Some(msg_id);
let sections = self.peer_mgr.pub_ids_by_section();
let prefixes = self.peer_mgr.routing_table().prefixes();
let digest = sha256::hash(&match serialisation::serialise(&(sections, prefixes)) {
Ok(serialised) => serialised,
Err(e) => {
warn!("{:?} Serialisation failed: {:?}", self, e);
return;
}
});
trace!("{:?} Sending RT request {:?} with digest {:?}",
self,
msg_id,
utils::format_binary_array(&digest));
let src = Authority::ManagedNode(*self.name());
let dst = Authority::Section(*self.name());
let content = MessageContent::RoutingTableRequest(msg_id, digest);
if let Err(err) = self.send_routing_message(src, dst, content) {
debug!("{:?} Failed to send RoutingTableRequest: {:?}.", self, err);
}
}
}
fn send_candidate_approval(&mut self) {
let (candidate_id, client_auth, sections) = match self.peer_mgr.verified_candidate_info() {
Err(_) => {
trace!("{:?} No candidate for which to send CandidateApproval.",
self);
return;
}
Ok(info) => info,
};
if self.we_want_to_merge() || self.they_want_to_merge() {
debug!("{:?} Resource proof duration has finished, but not voting to approve \
candidate {} since our section is currently merging.",
self,
candidate_id.name());
return;
}
let src = Authority::Section(*candidate_id.name());
let response_content = MessageContent::CandidateApproval {
candidate_id: candidate_id,
client_auth: client_auth,
sections: sections,
};
info!("{:?} Resource proof duration has finished. Voting to approve candidate {}.",
self,
candidate_id.name());
trace!("{:?} Sending {:?} to {:?}.", self, response_content, src);
if let Err(error) = self.send_routing_message(src, src, response_content) {
debug!("{:?} Failed sending CandidateApproval: {:?}", self, error);
}
}
fn decrypt_connection_info(&self,
encrypted_connection_info: &[u8],
nonce: &box_::Nonce,
public_id: &PublicId)
-> Result<PubConnectionInfo, RoutingError> {
let decipher_result = box_::open(encrypted_connection_info,
nonce,
public_id.encrypting_public_key(),
self.full_id.encrypting_private_key());
let serialised_connection_info =
decipher_result.map_err(|()| RoutingError::AsymmetricDecryptionFailure)?;
Ok(serialisation::deserialise(&serialised_connection_info)?)
}
fn reset_rt_timer(&mut self) {
trace!("{:?} Scheduling a RT request for {} seconds from now. Previous rt_msg_id: {:?}",
self,
RT_MIN_TIMEOUT_SECS,
self.rt_msg_id);
self.rt_msg_id = None;
self.rt_timeout = Duration::from_secs(RT_MIN_TIMEOUT_SECS);
self.rt_timer_token = Some(self.timer.schedule(self.rt_timeout));
}
fn send_user_message(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
user_msg: UserMessage,
priority: u8)
-> Result<(), RoutingError> {
self.stats.count_user_message(&user_msg);
for part in user_msg.to_parts(priority)? {
self.send_routing_message(src, dst, part)?;
}
Ok(())
}
fn send_signed_message(&mut self,
signed_msg: &SignedMessage,
route: u8,
hop: &XorName,
sent_to: &BTreeSet<XorName>)
-> Result<(), RoutingError> {
let sent_by_us = hop == self.name() && signed_msg.signed_by(self.full_id().public_id());
if sent_by_us {
self.stats.count_route(route);
}
let dst = signed_msg.routing_message().dst;
if let Authority::Client { ref peer_id, .. } = dst {
if *self.name() == dst.name() {
return self.relay_to_client(signed_msg, peer_id);
} else if self.in_authority(&dst) {
return Ok(()); }
}
let (new_sent_to, target_peer_ids) =
self.get_targets(signed_msg.routing_message(), route, hop, sent_to)?;
for target_peer_id in target_peer_ids {
self.send_signed_msg_to_peer(signed_msg.clone(),
target_peer_id,
route,
new_sent_to.clone())?;
}
Ok(())
}
fn send_signed_msg_to_peer(&mut self,
signed_msg: SignedMessage,
target: PeerId,
route: u8,
sent_to: BTreeSet<XorName>)
-> Result<(), RoutingError> {
let priority = signed_msg.priority();
let routing_msg = signed_msg.routing_message().clone();
let (peer_id, bytes) = if self.crust_service.is_connected(&target) {
let serialised = self.to_hop_bytes(signed_msg, route, sent_to)?;
(target, serialised)
} else if let Some(&tunnel_id) = self.tunnels.tunnel_for(&target) {
let serialised = self.to_tunnel_hop_bytes(signed_msg, route, sent_to, target)?;
(tunnel_id, serialised)
} else {
trace!("{:?} Not connected or tunnelling to {:?}. Dropping peer.",
self,
target);
self.disconnect_peer(&target, None);
return Ok(());
};
if !self.filter_outgoing_routing_msg(&routing_msg, &target, route) {
self.send_or_drop(&peer_id, bytes, priority);
}
Ok(())
}
fn relay_to_client(&mut self,
signed_msg: &SignedMessage,
peer_id: &PeerId)
-> Result<(), RoutingError> {
let priority = signed_msg.priority();
if self.peer_mgr.get_connected_peer(peer_id).is_some() {
if self.filter_outgoing_routing_msg(signed_msg.routing_message(), peer_id, 0) {
return Ok(());
}
let hop_msg = HopMessage::new(signed_msg.clone(),
0,
BTreeSet::new(),
self.full_id.signing_private_key())?;
let message = Message::Hop(hop_msg);
let raw_bytes = serialisation::serialise(&message)?;
self.send_or_drop(peer_id, raw_bytes, priority);
Ok(())
} else {
let hop = *self.name();
self.send_ack_from(signed_msg.routing_message(), 0, Authority::ManagedNode(hop));
debug!("{:?} Client connection not found for message {:?}.",
self,
signed_msg);
Err(RoutingError::ClientConnectionNotFound)
}
}
fn get_signature_target(&self, src: &Authority<XorName>, route: u8) -> Option<XorName> {
use Authority::*;
let list: Vec<&XorName> = match *src {
ClientManager(_) | NaeManager(_) | NodeManager(_) => {
let mut v = self.peer_mgr
.routing_table()
.our_section()
.iter()
.sorted_by(|&lhs, &rhs| src.name().cmp_distance(lhs, rhs));
v.truncate(self.min_section_size());
v
}
Section(_) => {
self.peer_mgr
.routing_table()
.our_section()
.iter()
.sorted_by(|&lhs, &rhs| src.name().cmp_distance(lhs, rhs))
}
PrefixSection(ref pfx) => {
self.peer_mgr
.routing_table()
.iter()
.filter(|name| pfx.matches(name))
.chain(iter::once(self.name()))
.sorted_by(|&lhs, &rhs| src.name().cmp_distance(lhs, rhs))
}
ManagedNode(_) | Client { .. } => return Some(*self.name()),
};
if !list.contains(&self.name()) {
None
} else {
Some(*list[route as usize % list.len()])
}
}
fn get_targets(&self,
routing_msg: &RoutingMessage,
route: u8,
exclude: &XorName,
sent_to: &BTreeSet<XorName>)
-> Result<(BTreeSet<XorName>, Vec<PeerId>), RoutingError> {
let force_via_proxy = match routing_msg.content {
MessageContent::ConnectionInfoRequest { pub_id, .. } |
MessageContent::ConnectionInfoResponse { pub_id, .. } => {
routing_msg.src.is_client() && pub_id == *self.full_id.public_id()
}
_ => false,
};
if self.is_proper() && !force_via_proxy {
let targets: HashSet<_> = self.peer_mgr
.routing_table()
.targets(&routing_msg.dst, *exclude, route as usize)?
.into_iter()
.filter(|target| !sent_to.contains(target))
.collect();
let new_sent_to = if self.in_authority(&routing_msg.dst) {
sent_to.iter()
.chain(targets.iter())
.chain(iter::once(self.name()))
.cloned()
.collect()
} else {
BTreeSet::new()
};
Ok((new_sent_to, self.peer_mgr.get_peer_ids(&targets)))
} else if let Authority::Client { ref proxy_node_name, .. } = routing_msg.src {
if let Some(&peer_id) = self.peer_mgr.get_proxy_peer_id(proxy_node_name) {
Ok((BTreeSet::new(), vec![peer_id]))
} else {
error!("{:?} Unable to find connection to proxy node in proxy map.",
self);
Err(RoutingError::ProxyConnectionNotFound)
}
} else {
error!("{:?} Source should be client if our state is a Client.",
self);
Err(RoutingError::InvalidSource)
}
}
fn to_tunnel_hop_bytes(&self,
signed_msg: SignedMessage,
route: u8,
sent_to: BTreeSet<XorName>,
dst: PeerId)
-> Result<Vec<u8>, RoutingError> {
let hop_msg = HopMessage::new(signed_msg,
route,
sent_to,
self.full_id.signing_private_key())?;
let message = Message::TunnelHop {
content: hop_msg,
src: self.crust_service.id(),
dst: dst,
};
Ok(serialisation::serialise(&message)?)
}
fn send_node_identify(&mut self, peer_id: PeerId) {
let serialised_public_id = match serialisation::serialise(self.full_id().public_id()) {
Ok(rslt) => rslt,
Err(e) => {
error!("Failed to serialise public ID: {:?}", e);
return;
}
};
let signature = sign::sign_detached(&serialised_public_id,
self.full_id().signing_private_key());
let direct_message = if self.is_approved {
DirectMessage::NodeIdentify {
serialised_public_id: serialised_public_id,
signature: signature,
}
} else {
DirectMessage::CandidateIdentify {
serialised_public_id: serialised_public_id,
signature: signature,
}
};
self.send_direct_message(peer_id, direct_message);
}
fn send_connection_info_request(&mut self,
their_public_id: PublicId,
src: Authority<XorName>,
dst: Authority<XorName>,
outbox: &mut EventBox)
-> Result<(), RoutingError> {
let their_name = *their_public_id.name();
if let Some(peer_id) = self.peer_mgr
.get_proxy_or_client_or_joining_node_peer_id(&their_public_id) {
self.send_node_identify(peer_id);
self.add_to_routing_table(&their_public_id, &peer_id, outbox);
return Ok(());
}
self.peer_mgr.allow_connect(&their_name)?;
if let Some(token) = self.peer_mgr.get_connection_token(src, dst, their_public_id) {
self.crust_service.prepare_connection_info(token);
return Ok(());
}
let our_pub_info = match self.peer_mgr.get_state_by_name(&their_name) {
Some(&PeerState::ConnectionInfoReady(ref our_priv_info)) => {
our_priv_info.to_pub_connection_info()
}
state => {
trace!("{:?} Not sending connection info request to {:?}. State: {:?}",
self,
their_name,
state);
return Ok(());
}
};
trace!("{:?} Resending connection info request to {:?}",
self,
their_name);
self.send_connection_info(our_pub_info, their_public_id, src, dst, None);
Ok(())
}
fn dropped_peer(&mut self,
peer_id: &PeerId,
outbox: &mut EventBox,
mut try_reconnect: bool)
-> bool {
let (peer, removal_result) = match self.peer_mgr.remove_peer(peer_id) {
Some(result) => result,
None => return true,
};
if let Ok(removal_details) = removal_result {
if !self.dropped_routing_node(peer.pub_id(), removal_details, outbox) {
return false;
}
}
match *peer.state() {
PeerState::Client => {
debug!("{:?} Client disconnected: {:?}", self, peer_id);
}
PeerState::JoiningNode => {
debug!("{:?} Joining node {:?} dropped. {} remaining.",
self,
peer_id,
self.peer_mgr.joining_nodes_num());
}
PeerState::Proxy => {
debug!("{:?} Lost bootstrap connection to {:?} ({:?}).",
self,
peer.name(),
peer_id);
if self.peer_mgr.routing_table().len() < self.min_section_size() - 1 {
outbox.send_event(Event::Terminate);
return false;
}
try_reconnect = false;
}
_ => (),
}
if try_reconnect && !peer.pub_id().is_client_id() {
debug!("{:?} Sending connection info to {:?} due to dropped peer.",
self,
peer.pub_id());
let own_name = *self.name();
if let Err(error) =
self.send_connection_info_request(*peer.pub_id(),
Authority::ManagedNode(own_name),
Authority::ManagedNode(*peer.name()),
outbox) {
debug!("{:?} - Failed to send connection info to {:?}: {:?}",
self,
peer.pub_id(),
error);
}
}
true
}
fn dropped_routing_node(&mut self,
pub_id: &PublicId,
details: RemovalDetails<XorName>,
outbox: &mut EventBox)
-> bool {
info!("{:?} Dropped {:?} from the routing table.",
self,
details.name);
outbox.send_event(Event::NodeLost(details.name, self.peer_mgr.routing_table().clone()));
self.merge_if_necessary();
self.peer_mgr
.routing_table()
.find_section_prefix(&details.name)
.map_or((),
|prefix| { self.send_section_list_signature(prefix, None); });
if details.was_in_our_section {
self.reset_rt_timer();
self.section_list_sigs.remove_signatures_by(*pub_id,
self.peer_mgr
.routing_table()
.our_section()
.len());
}
if self.peer_mgr.routing_table().is_empty() {
debug!("{:?} Lost all routing connections.", self);
if !self.is_first_node {
outbox.send_event(Event::RestartRequired);
return false;
}
}
true
}
fn send_section_split(&mut self, our_prefix: Prefix<XorName>, joining_node: XorName) {
for prefix in self.peer_mgr.routing_table().prefixes() {
let src = Authority::PrefixSection(our_prefix);
let dst = Authority::PrefixSection(prefix);
let content = MessageContent::SectionSplit(our_prefix, joining_node);
if let Err(err) = self.send_routing_message(src, dst, content) {
debug!("{:?} Failed to send SectionSplit: {:?}.", self, err);
}
}
}
fn merge_if_necessary(&mut self) {
if let Some((sender_prefix, merge_prefix, sections)) =
self.peer_mgr.should_merge(self.we_want_to_merge(), self.they_want_to_merge()) {
let content = MessageContent::OwnSectionMerge(sections);
let src = Authority::PrefixSection(sender_prefix);
let dst = Authority::PrefixSection(merge_prefix);
debug!("{:?} Sending OwnSectionMerge from {:?} to {:?} with content {:?}",
self,
src,
dst,
content);
if let Err(err) = self.send_routing_message(src, dst, content) {
debug!("{:?} Failed to send OwnSectionMerge: {:?}.", self, err);
}
}
}
fn send_other_section_merge(&mut self,
targets: BTreeSet<Prefix<XorName>>,
merge_details: OtherMergeDetails<XorName>) {
let section = self.peer_mgr.get_pub_ids(&merge_details.section);
let content = MessageContent::OtherSectionMerge(section);
let src = Authority::PrefixSection(merge_details.prefix);
for target in &targets {
let dst = Authority::PrefixSection(*target);
debug!("{:?} Sending OtherSectionMerge from {:?} to {:?} with content {:?}",
self,
src,
dst,
content);
if let Err(err) = self.send_routing_message(src, dst, content.clone()) {
debug!("{:?} Failed to send OtherSectionMerge: {:?}.", self, err);
}
}
}
fn dropped_tunnel_client(&mut self, peer_id: &PeerId) {
for other_id in self.tunnels.drop_client(peer_id) {
trace!("{:?} Closing tunnel client connection between {:?} and {:?}",
self,
peer_id,
other_id);
let message = DirectMessage::TunnelClosed(*peer_id);
self.send_direct_message(other_id, message);
}
}
fn dropped_tunnel_node(&mut self, peer_id: &PeerId, outbox: &mut EventBox) {
let peers = self.tunnels
.remove_tunnel(peer_id)
.into_iter()
.filter_map(|dst_id| {
self.peer_mgr.get_routing_peer(&dst_id).map(|dst_pub_id| {
(dst_id, *dst_pub_id)
})
})
.collect_vec();
for (dst_id, pub_id) in peers {
self.dropped_peer(&dst_id, outbox, false);
debug!("{:?} Lost tunnel for peer {:?} ({:?}). Requesting new tunnel.",
self,
dst_id,
pub_id.name());
self.find_tunnel_for_peer(dst_id, &pub_id);
}
}
fn is_proper(&self) -> bool {
self.is_first_node || self.peer_mgr.routing_table().len() >= 1
}
fn send_direct_message(&mut self, dst_id: PeerId, direct_message: DirectMessage) {
self.stats().count_direct_message(&direct_message);
if let Some(&tunnel_id) = self.tunnels.tunnel_for(&dst_id) {
let message = Message::TunnelDirect {
content: direct_message,
src: self.crust_service.id(),
dst: dst_id,
};
self.send_message(&tunnel_id, message);
} else {
self.send_message(&dst_id, Message::Direct(direct_message));
}
}
fn our_prefix(&self) -> &Prefix<XorName> {
self.peer_mgr.routing_table().our_prefix()
}
fn resource_proof_response_progress(&self) -> String {
let mut parts_per_proof = 0;
let mut completed: usize = 0;
let mut incomplete = vec![];
for messages in self.resource_proof_response_parts.values() {
if let Some(next_message) = messages.last() {
match *next_message {
DirectMessage::ResourceProofResponse { part_index, part_count, .. } => {
parts_per_proof = part_count;
incomplete.push(part_index);
}
_ => return String::new(), }
} else {
completed += 1;
}
}
if self.proxy_is_resource_proof_challenger {
completed = completed.saturating_sub(1);
}
if self.resource_proof_response_parts.is_empty() {
"No resource proof challenges received yet; still establishing connections to peers."
.to_string()
} else if self.challenger_count == completed {
format!("All {} resource proof responses fully sent.", completed)
} else {
let progress = if parts_per_proof == 0 {
completed * 100 / self.challenger_count
} else {
(((parts_per_proof * completed) + incomplete.iter().sum::<usize>()) * 100) /
(parts_per_proof * self.challenger_count)
};
format!("{}/{} resource proof response(s) complete, {}% of data sent.",
completed,
self.challenger_count,
progress)
}
}
fn cache_section_update_request(&mut self, other_section_prefix: Prefix<XorName>) {
if (self.peer_mgr
.routing_table()
.should_merge(self.we_want_to_merge(), self.they_want_to_merge())
.is_some() || self.we_want_to_merge() || self.they_want_to_merge()) &&
other_section_prefix != self.our_prefix().sibling() {
let _ = self.cached_section_update_requests.insert(other_section_prefix);
}
}
fn format(duration: Duration) -> String {
format!("{} seconds",
if duration.subsec_nanos() >= 500_000_000 {
duration.as_secs() + 1
} else {
duration.as_secs()
})
}
}
impl Base for Node {
fn crust_service(&self) -> &Service {
&self.crust_service
}
fn full_id(&self) -> &FullId {
&self.full_id
}
fn in_authority(&self, auth: &Authority<XorName>) -> bool {
if let Authority::Client { ref client_key, .. } = *auth {
client_key == self.full_id.public_id().signing_public_key()
} else {
self.is_proper() && self.peer_mgr.routing_table().in_authority(auth)
}
}
fn close_group(&self, name: XorName, count: usize) -> Option<Vec<XorName>> {
self.peer_mgr
.routing_table()
.closest_names(&name, count)
.map(|names| names.into_iter().cloned().collect_vec())
}
fn handle_lost_peer(&mut self, peer_id: PeerId, outbox: &mut EventBox) -> Transition {
if peer_id == self.crust_service.id() {
error!("{:?} LostPeer fired with our crust peer ID.", self);
return Transition::Stay;
}
debug!("{:?} Received LostPeer - {:?}", self, peer_id);
self.dropped_tunnel_client(&peer_id);
self.dropped_tunnel_node(&peer_id, outbox);
if self.dropped_peer(&peer_id, outbox, true) {
Transition::Stay
} else {
Transition::Terminate
}
}
fn stats(&mut self) -> &mut Stats {
&mut self.stats
}
}
#[cfg(feature = "use-mock-crust")]
impl Node {
pub fn routing_table(&self) -> &RoutingTable<XorName> {
self.peer_mgr.routing_table()
}
pub fn has_tunnel_clients(&self, client_1: PeerId, client_2: PeerId) -> bool {
self.tunnels.has_clients(client_1, client_2)
}
pub fn resend_unacknowledged(&mut self) -> bool {
let timer_tokens = self.ack_mgr.timer_tokens();
for timer_token in &timer_tokens {
self.resend_unacknowledged_timed_out_msgs(*timer_token);
}
!timer_tokens.is_empty()
}
pub fn has_unacknowledged(&self) -> bool {
self.ack_mgr.has_pending()
}
pub fn clear_state(&mut self) {
self.ack_mgr.clear();
self.routing_msg_filter.clear();
if self.peer_mgr.remove_connecting_peers() {
self.merge_if_necessary();
}
self.tunnels.clear_new_clients();
}
pub fn section_list_signatures(&self,
prefix: Prefix<XorName>)
-> Result<BTreeMap<PublicId, sign::Signature>, RoutingError> {
if let Some(&(_, ref signatures)) = self.section_list_sigs.get_signatures(prefix) {
Ok(signatures.iter().map(|(&pub_id, &sig)| (pub_id, sig)).collect())
} else {
Err(RoutingError::NotEnoughSignatures)
}
}
pub fn set_next_node_name(&mut self, relocation_name: Option<XorName>) {
self.next_node_name = relocation_name;
}
}
impl Bootstrapped for Node {
fn ack_mgr(&self) -> &AckManager {
&self.ack_mgr
}
fn ack_mgr_mut(&mut self) -> &mut AckManager {
&mut self.ack_mgr
}
fn min_section_size(&self) -> usize {
self.peer_mgr.routing_table().min_section_size()
}
fn send_routing_message_via_route(&mut self,
routing_msg: RoutingMessage,
route: u8)
-> Result<(), RoutingError> {
if !self.in_authority(&routing_msg.src) {
trace!("{:?} Not part of the source authority. Not sending message {:?}.",
self,
routing_msg);
return Ok(());
}
if !self.add_to_pending_acks(&routing_msg, route) {
debug!("{:?} already received an ack for {:?} - so not resending it.",
self,
routing_msg);
return Ok(());
}
use routing_table::Authority::*;
let sending_names = match routing_msg.src {
ClientManager(_) | NaeManager(_) | NodeManager(_) | ManagedNode(_) => {
let section =
self.peer_mgr
.routing_table()
.get_section(self.name())
.ok_or(RoutingError::RoutingTable(RoutingTableError::NoSuchPeer))?;
let pub_ids = self.peer_mgr.get_pub_ids(section);
vec![SectionList::new(*self.our_prefix(), pub_ids)]
}
Section(_) => {
vec![SectionList::new(*self.our_prefix(),
self.peer_mgr.get_pub_ids(self.peer_mgr
.routing_table()
.our_section()))]
}
PrefixSection(ref prefix) => {
self.peer_mgr
.routing_table()
.all_sections()
.into_iter()
.filter_map(|(p, members)| if prefix.is_compatible(&p) {
Some(SectionList::new(p, self.peer_mgr.get_pub_ids(&members)))
} else {
None
})
.collect()
}
Client { .. } => vec![],
};
let signed_msg = SignedMessage::new(routing_msg, &self.full_id, sending_names)?;
match self.get_signature_target(&signed_msg.routing_message().src, route) {
None => Ok(()),
Some(our_name) if our_name == *self.name() => {
let min_section_size = self.min_section_size();
if let Some((msg, route)) =
self.sig_accumulator.add_message(signed_msg, min_section_size, route) {
if self.in_authority(&msg.routing_message().dst) {
self.handle_signed_message(msg, route, our_name, &BTreeSet::new())?;
} else {
self.send_signed_message(&msg, route, &our_name, &BTreeSet::new())?;
}
}
Ok(())
}
Some(target_name) => {
if let Some(&peer_id) = self.peer_mgr.get_peer_id(&target_name) {
let direct_msg = signed_msg.routing_message()
.to_signature(self.full_id().signing_private_key())?;
self.send_direct_message(peer_id, direct_msg);
Ok(())
} else {
Err(RoutingError::RoutingTable(RoutingTableError::NoSuchPeer))
}
}
}
}
fn routing_msg_filter(&mut self) -> &mut RoutingMessageFilter {
&mut self.routing_msg_filter
}
fn timer(&mut self) -> &mut Timer {
&mut self.timer
}
}
impl Debug for Node {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "Node({}({:b}))", self.name(), self.our_prefix())
}
}
fn verify_signed_public_id(serialised_public_id: &[u8],
signature: &sign::Signature)
-> Result<PublicId, RoutingError> {
let public_id: PublicId = serialisation::deserialise(serialised_public_id)?;
let public_key = public_id.signing_public_key();
if sign::verify_detached(signature, serialised_public_id, public_key) {
Ok(public_id)
} else {
Err(RoutingError::FailedSignature)
}
}