use accumulator::Accumulator;
#[cfg(not(feature = "use-mock-crust"))]
use crust::{self, ConnectionInfoResult, CrustError, PrivConnectionInfo, PeerId, Service,
PubConnectionInfo};
#[cfg(feature = "use-mock-crust")]
use mock_crust::crust::{self, ConnectionInfoResult, CrustError, PrivConnectionInfo, PeerId,
Service, PubConnectionInfo};
use itertools::Itertools;
use kademlia_routing_table::{AddedNodeDetails, ContactInfo, DroppedNodeDetails};
use lru_time_cache::LruCache;
use maidsafe_utilities::{self, serialisation};
use maidsafe_utilities::event_sender::MaidSafeEventCategory;
use message_filter::MessageFilter;
use peer_manager::{ConnectState, PeerManager};
use rand;
use sodiumoxide::crypto::{box_, sign};
use sodiumoxide::crypto::hash::sha256;
use std::{cmp, iter, fmt};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::net::SocketAddr;
use std::sync::mpsc;
use std::time::{Duration, Instant};
use tunnels::Tunnels;
use xor_name::{XorName, XOR_NAME_BITS};
use action::Action;
use authority::Authority;
use error::{RoutingError, InterfaceError};
use event::Event;
use id::{FullId, PublicId};
use stats::Stats;
use timer::Timer;
use types::{MessageId, RoutingActionSender};
use messages::{DirectMessage, HopMessage, Message, MessageContent, RoutingMessage, SignedMessage,
UserMessage, DEFAULT_PRIORITY};
use utils;
pub const GROUP_SIZE: usize = 8;
pub const QUORUM_SIZE: usize = 5;
const EXTRA_BUCKET_ENTRIES: usize = 2;
const BOOTSTRAP_TIMEOUT_SECS: u64 = 20;
const GET_NODE_NAME_TIMEOUT_SECS: u64 = 60;
const TICK_TIMEOUT_SECS: u64 = 60;
const SENT_NETWORK_NAME_TIMEOUT_SECS: u64 = 30;
const REFRESH_BUCKET_GROUPS_SECS: u64 = 120;
const ACK_TIMEOUT_SECS: u64 = 20;
#[derive(PartialEq, Eq, Debug, Clone)]
enum State {
Disconnected,
Bootstrapping(PeerId, u64),
Client,
Node,
}
pub type RoutingTable = ::kademlia_routing_table::RoutingTable<NodeInfo>;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct NodeInfo {
public_id: PublicId,
peer_id: PeerId,
}
impl NodeInfo {
fn new(public_id: PublicId, peer_id: PeerId) -> Self {
NodeInfo {
public_id: public_id,
peer_id: peer_id,
}
}
}
impl ContactInfo for NodeInfo {
type Name = XorName;
fn name(&self) -> &XorName {
self.public_id.name()
}
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)]
pub enum Role {
Client,
Node,
FirstNode,
}
#[derive(Clone, Debug)]
struct UnacknowledgedMessage {
signed_msg: SignedMessage,
route: u8,
timer_token: u64,
}
pub struct Core {
crust_service: Service,
role: Role,
is_listening: bool,
category_rx: mpsc::Receiver<MaidSafeEventCategory>,
crust_rx: mpsc::Receiver<crust::Event>,
action_rx: mpsc::Receiver<Action>,
event_sender: mpsc::Sender<Event>,
timer: Timer,
signed_message_filter: MessageFilter<SignedMessage>,
pending_acks: HashMap<u64, UnacknowledgedMessage>,
received_acks: MessageFilter<u64>,
bucket_filter: MessageFilter<usize>,
message_accumulator: Accumulator<RoutingMessage, sign::PublicKey>,
grp_msg_cache: LruCache<sha256::Digest, RoutingMessage>,
grp_msg_filter: MessageFilter<RoutingMessage>,
full_id: FullId,
state: State,
routing_table: RoutingTable,
get_node_name_timer_token: Option<u64>,
bucket_refresh_token_and_delay: Option<(u64, u64)>,
sent_network_name_to: Option<(XorName, Instant)>,
tick_timer_token: Option<u64>,
tunnels: Tunnels,
stats: Stats,
send_filter: LruCache<(u64, PeerId, u8), ()>,
user_msg_cache: LruCache<(u64, u32), BTreeMap<u32, Vec<u8>>>,
peer_mgr: PeerManager,
bootstrap_blacklist: HashSet<SocketAddr>,
}
#[cfg_attr(feature="clippy", allow(new_ret_no_self))] impl Core {
pub fn new(event_sender: mpsc::Sender<Event>,
role: Role,
keys: Option<FullId>)
-> (RoutingActionSender, Self) {
let (crust_tx, crust_rx) = mpsc::channel();
let (action_tx, action_rx) = mpsc::channel();
let (category_tx, category_rx) = mpsc::channel();
let routing_event_category = MaidSafeEventCategory::Routing;
let action_sender =
RoutingActionSender::new(action_tx, routing_event_category, category_tx.clone());
let action_sender2 = action_sender.clone();
let crust_event_category = MaidSafeEventCategory::Crust;
let crust_sender =
crust::CrustEventSender::new(crust_tx, crust_event_category, category_tx);
let crust_service = match Service::new(crust_sender) {
Ok(service) => service,
Err(what) => panic!(format!("Unable to start crust::Service {:?}", what)),
};
let full_id = match keys {
Some(full_id) => full_id,
None => FullId::new(),
};
let our_info = NodeInfo::new(*full_id.public_id(), crust_service.id());
let mut core = Core {
crust_service: crust_service,
role: role,
is_listening: false,
category_rx: category_rx,
crust_rx: crust_rx,
action_rx: action_rx,
event_sender: event_sender,
timer: Timer::new(action_sender2),
signed_message_filter: MessageFilter::with_expiry_duration(Duration::from_secs(60 *
20)),
pending_acks: HashMap::new(),
received_acks: MessageFilter::with_expiry_duration(Duration::from_secs(4 * 60)),
bucket_filter: MessageFilter::with_expiry_duration(Duration::from_secs(60)),
message_accumulator: Accumulator::with_duration(1, Duration::from_secs(60 * 20)),
grp_msg_cache: LruCache::with_expiry_duration(Duration::from_secs(60 * 20)),
grp_msg_filter: MessageFilter::with_expiry_duration(Duration::from_secs(60 * 20)),
full_id: full_id,
state: State::Disconnected,
routing_table: RoutingTable::new(our_info, GROUP_SIZE, EXTRA_BUCKET_ENTRIES),
get_node_name_timer_token: None,
bucket_refresh_token_and_delay: None,
sent_network_name_to: None,
tick_timer_token: None,
tunnels: Default::default(),
stats: Default::default(),
send_filter: LruCache::with_expiry_duration(Duration::from_secs(60 * 10)),
user_msg_cache: LruCache::with_expiry_duration(Duration::from_secs(60 * 20)),
peer_mgr: Default::default(),
bootstrap_blacklist: HashSet::new(),
};
core.crust_service.start_service_discovery();
if role == Role::FirstNode {
core.start_new_network();
} else {
let _ = core.crust_service.start_bootstrap(core.bootstrap_blacklist.clone());
}
(action_sender, core)
}
#[cfg(feature = "use-mock-crust")]
pub fn poll(&mut self) -> bool {
match self.category_rx.try_recv() {
Ok(category) => {
self.handle_event(category);
true
}
_ => false,
}
}
#[cfg(not(feature = "use-mock-crust"))]
pub fn run(&mut self) {
loop {
let run = self.category_rx
.recv()
.map(|category| self.handle_event(category))
.unwrap_or(false);
if !run {
break;
}
}
}
pub fn name(&self) -> &XorName {
self.full_id.public_id().name()
}
#[allow(unused)]
pub fn close_group(&self) -> Vec<XorName> {
self.routing_table
.other_close_nodes(self.name(), GROUP_SIZE)
.unwrap_or_else(Vec::new)
.into_iter()
.map(|info| *info.name())
.collect()
}
#[allow(unused)]
pub fn routing_table(&self) -> &RoutingTable {
&self.routing_table
}
#[cfg(feature = "use-mock-crust")]
pub fn resend_unacknowledged(&mut self) -> bool {
self.timer.stop();
let timer_tokens = self.pending_acks
.iter()
.map(|(_, unacked_msg)| unacked_msg.timer_token)
.collect_vec();
for timer_token in &timer_tokens {
self.handle_timeout(*timer_token);
}
!timer_tokens.is_empty()
}
#[cfg(feature = "use-mock-crust")]
pub fn clear_state(&mut self) {
self.send_filter.clear();
self.signed_message_filter.clear();
self.received_acks.clear();
self.bucket_filter.clear();
self.grp_msg_filter.clear();
self.sent_network_name_to = None;
self.peer_mgr.clear_caches();
}
fn update_stats(&mut self) {
if self.state == State::Node {
let old_client_num = self.stats.cur_client_num;
self.stats.cur_client_num = self.peer_mgr.client_num();
if self.stats.cur_client_num != old_client_num {
if self.stats.cur_client_num > old_client_num {
self.stats.cumulative_client_num += self.stats.cur_client_num - old_client_num;
}
info!("{:?} - Connected clients: {}, cumulative: {}",
self,
self.stats.cur_client_num,
self.stats.cumulative_client_num);
}
if self.stats.tunnel_connections != self.tunnels.tunnel_count() ||
self.stats.tunnel_client_pairs != self.tunnels.client_count() {
self.stats.tunnel_connections = self.tunnels.tunnel_count();
self.stats.tunnel_client_pairs = self.tunnels.client_count();
info!("{:?} - Indirect connections: {}, tunneling for: {}",
self,
self.stats.tunnel_connections,
self.stats.tunnel_client_pairs);
}
}
if self.state == State::Node &&
self.stats.cur_routing_table_size != self.routing_table.len() {
self.stats.cur_routing_table_size = self.routing_table.len();
let status_str = format!("{:?} {:?} - Routing Table size: {:3}",
self,
self.crust_service.id(),
self.routing_table.len());
info!(" -{}- ",
iter::repeat('-').take(status_str.len()).collect::<String>());
info!("| {} |", status_str); info!(" -{}- ",
iter::repeat('-').take(status_str.len()).collect::<String>());
}
}
fn handle_event(&mut self, category: MaidSafeEventCategory) -> bool {
match category {
MaidSafeEventCategory::Routing => {
if let Ok(action) = self.action_rx.try_recv() {
if !self.handle_action(action) {
return false;
}
}
}
MaidSafeEventCategory::Crust => {
if let Ok(crust_event) = self.crust_rx.try_recv() {
self.handle_crust_event(crust_event);
}
}
}
self.update_stats();
true
}
fn handle_action(&mut self, action: Action) -> bool {
match action {
Action::NodeSendMessage { src, dst, content, priority, result_tx } => {
if result_tx.send(match self.send_user_message(src, dst, content, priority) {
Err(RoutingError::Interface(err)) => Err(err),
Err(_err) => Ok(()),
Ok(()) => Ok(()),
})
.is_err() {
return false;
}
}
Action::ClientSendRequest { content, dst, priority, result_tx } => {
if result_tx.send(if let Ok(src) = self.get_client_authority() {
let user_msg = UserMessage::Request(content);
match self.send_user_message(src, dst, user_msg, priority) {
Err(RoutingError::Interface(err)) => Err(err),
Err(_) | Ok(()) => Ok(()),
}
} else {
Err(InterfaceError::NotConnected)
})
.is_err() {
return false;
}
}
Action::CloseGroup { name, result_tx } => {
let close_group = self.routing_table
.close_nodes(&name, GROUP_SIZE)
.map(|infos| {
infos.iter()
.map(NodeInfo::name)
.cloned()
.collect()
});
if result_tx.send(close_group).is_err() {
return false;
}
}
Action::Name { result_tx } => {
if result_tx.send(*self.name()).is_err() {
return false;
}
}
Action::QuorumSize { result_tx } => {
if result_tx.send(self.dynamic_quorum_size()).is_err() {
return false;
}
}
Action::Timeout(token) => self.handle_timeout(token),
Action::Terminate => {
return false;
}
}
true
}
fn handle_crust_event(&mut self, crust_event: crust::Event) {
match crust_event {
crust::Event::BootstrapFailed => self.handle_bootstrap_failed(),
crust::Event::BootstrapConnect(peer_id, socket_addr) => {
self.handle_bootstrap_connect(peer_id, socket_addr)
}
crust::Event::BootstrapAccept(peer_id) => self.handle_bootstrap_accept(peer_id),
crust::Event::NewPeer(result, peer_id) => self.handle_new_peer(result, peer_id),
crust::Event::LostPeer(peer_id) => self.handle_lost_peer(peer_id),
crust::Event::NewMessage(peer_id, bytes) => {
match self.handle_new_message(peer_id, bytes) {
Err(RoutingError::FilterCheckFailed) |
Ok(_) => (),
Err(err) => debug!("{:?} - {:?}", self, err),
}
}
crust::Event::ConnectionInfoPrepared(ConnectionInfoResult { result_token, result }) => {
self.handle_connection_info_prepared(result_token, result);
}
crust::Event::ListenerStarted(port) => {
trace!("{:?} Listener started on port {}.", self, port);
self.crust_service.set_service_discovery_listen(true);
if self.role == Role::Node {
if let Err(error) = self.relocate() {
error!("{:?} Failed to start relocation: {:?}", self, error);
let _ = self.event_sender.send(Event::RestartRequired);
}
}
}
crust::Event::ListenerFailed => {
error!("{:?} Failed to start listening.", self);
let _ = self.event_sender.send(Event::Terminate);
}
crust::Event::WriteMsgSizeProhibitive(peer_id, msg) => {
error!("{:?} Failed to send {}-byte message to {:?}. Message too large.",
self,
msg.len(),
peer_id);
}
}
}
fn handle_bootstrap_connect(&mut self, peer_id: PeerId, socket_addr: SocketAddr) {
if self.role == Role::FirstNode {
debug!("{:?} Received BootstrapConnect as the first node.", self);
self.disconnect_peer(&peer_id);
return;
}
match self.state {
State::Disconnected => {
debug!("{:?} Received BootstrapConnect from {:?}.", self, peer_id);
let _ = self.client_identify(peer_id);
let _ = self.bootstrap_blacklist.insert(socket_addr);
}
State::Bootstrapping(bootstrap_id, _) if bootstrap_id == peer_id => {
warn!("{:?} Got more than one BootstrapConnect for peer {:?}.",
self,
peer_id);
}
_ => {
self.disconnect_peer(&peer_id);
}
}
}
fn start_new_network(&mut self) {
if !self.start_listening() {
error!("{:?} Failed to start listening.", self);
let _ = self.event_sender.send(Event::Terminate);
}
let new_name = XorName(sha256::hash(&self.full_id.public_id().name().0).0);
self.set_self_node_name(new_name);
self.state = State::Node;
let tick_period = Duration::from_secs(TICK_TIMEOUT_SECS);
self.tick_timer_token = Some(self.timer.schedule(tick_period));
info!("{:?} - Started a new network as a seed node.", self)
}
fn handle_bootstrap_accept(&mut self, peer_id: PeerId) {
trace!("{:?} Received BootstrapAccept from {:?}.", self, peer_id);
}
fn handle_new_peer(&mut self, result: Result<(), CrustError>, peer_id: PeerId) {
if peer_id == self.crust_service.id() {
debug!("{:?} Received NewPeer event with our Crust peer ID.", self);
return;
}
if self.role == Role::Client {
warn!("{:?} Received NewPeer event as a client.", self);
} else {
match result {
Ok(()) => {
if let Some(tunnel_id) = self.tunnels.remove_tunnel_for(&peer_id) {
debug!("{:?} Removing unwanted tunnel for {:?}", self, peer_id);
let message = DirectMessage::TunnelDisconnect(peer_id);
let _ = self.send_direct_message(&tunnel_id, message);
} else if let Some(node) = self.routing_table
.iter()
.find(|node| node.peer_id == peer_id) {
warn!("{:?} Received NewPeer from {:?}, but node {:?} is already in our \
routing table.",
self,
peer_id,
node.name());
return;
}
self.peer_mgr.connected_to(peer_id);
debug!("{:?} Received NewPeer with Ok from {:?}. Sending NodeIdentify.",
self,
peer_id);
let _ = self.node_identify(peer_id);
}
Err(err) => {
if self.routing_table.iter().all(|node| node.peer_id != peer_id) {
info!("{:?} Failed to connect to peer {:?}: {:?}.",
self,
peer_id,
err);
if let Some(&(name, ConnectState::Crust)) = self.peer_mgr
.get_connecting_peer(&peer_id) {
self.find_tunnel_for_peer(peer_id, name);
}
}
}
}
}
}
fn find_tunnel_for_peer(&mut self, peer_id: PeerId, name: XorName) {
let _ = self.peer_mgr.insert_connecting_peer(peer_id, name, ConnectState::Tunnel);
for node in self.routing_table.closest_nodes_to(&name, GROUP_SIZE, false) {
trace!("{:?} Asking {:?} to serve as a tunnel.", self, node.name());
let tunnel_request = DirectMessage::TunnelRequest(peer_id);
let _ = self.send_direct_message(&node.peer_id, tunnel_request);
}
}
fn handle_connection_info_prepared(&mut self,
result_token: u32,
result: Result<PrivConnectionInfo, CrustError>) {
let our_connection_info = match result {
Err(err) => {
error!("{:?} Failed to prepare connection info: {:?}", self, err);
return;
}
Ok(connection_info) => connection_info,
};
let encoded_connection_info =
match serialisation::serialise(&our_connection_info.to_pub_connection_info()) {
Err(err) => {
error!("{:?} Failed to serialise connection info: {:?}", self, err);
return;
}
Ok(encoded_connection_info) => encoded_connection_info,
};
let (their_public_id, src, dst) = if let Some(entry) = self.peer_mgr
.connection_token_map
.remove(&result_token) {
entry.clone()
} else {
error!("{:?} Prepared connection info, but no entry found in token map.",
self);
return;
};
let nonce = box_::gen_nonce();
let encrypted_connection_info = box_::seal(&encoded_connection_info,
&nonce,
their_public_id.encrypting_public_key(),
self.full_id.encrypting_private_key());
let their_name = *their_public_id.name();
if let Some(their_connection_info) = self.peer_mgr
.their_connection_info_map
.remove(&their_public_id) {
let peer_id = their_connection_info.id();
if let Some((name, _)) = self.peer_mgr
.insert_connecting_peer(peer_id, their_name, ConnectState::Crust) {
warn!("{:?} Prepared connection info for {:?} as {:?}, but already tried as {:?}.",
self,
peer_id,
their_name,
name);
}
debug!("{:?} Trying to connect to {:?} as {:?}.",
self,
peer_id,
their_name);
let _ = self.crust_service.connect(our_connection_info, their_connection_info);
} else {
let _ =
self.peer_mgr.our_connection_info_map.insert(their_public_id, our_connection_info);
debug!("{:?} Prepared connection info for {:?}.", self, their_name);
}
let request_content = MessageContent::ConnectionInfo {
encrypted_connection_info: encrypted_connection_info,
nonce_bytes: nonce.0,
public_id: *self.full_id.public_id(),
};
let request_msg = RoutingMessage {
src: src,
dst: dst,
content: request_content,
};
if let Err(err) = self.send_message(request_msg) {
debug!("{:?} Failed to send connection info for {:?}: {:?}.",
self,
their_name,
err);
}
}
fn handle_new_message(&mut self, peer_id: PeerId, bytes: Vec<u8>) -> Result<(), RoutingError> {
match serialisation::deserialise(&bytes) {
Ok(Message::Hop(ref hop_msg)) => self.handle_hop_message(hop_msg, peer_id),
Ok(Message::Direct(direct_msg)) => self.handle_direct_message(direct_msg, peer_id),
Ok(Message::TunnelDirect { content, src, dst }) => {
if dst == self.crust_service.id() &&
self.tunnels.tunnel_for(&src) == Some(&peer_id) {
self.handle_direct_message(content, src)
} else if self.tunnels.has_clients(src, dst) {
self.send_or_drop(&dst, bytes, content.priority())
} else if self.tunnels.accept_clients(src, dst) {
try!(self.send_direct_message(&dst, DirectMessage::TunnelSuccess(src)));
self.send_or_drop(&dst, bytes, content.priority())
} else {
Err(RoutingError::InvalidDestination)
}
}
Ok(Message::TunnelHop { content, src, dst }) => {
if dst == self.crust_service.id() &&
self.tunnels.tunnel_for(&src) == Some(&peer_id) {
self.handle_hop_message(&content, src)
} else if self.tunnels.has_clients(src, dst) {
self.send_or_drop(&dst, bytes, content.content().priority())
} else {
Err(RoutingError::InvalidDestination)
}
}
Err(error) => Err(RoutingError::SerialisationError(error)),
}
}
fn handle_hop_message(&mut self,
hop_msg: &HopMessage,
peer_id: PeerId)
-> Result<(), RoutingError> {
let hop_name;
if self.state == State::Node {
if let Some(info) = self.routing_table.iter().find(|node| node.peer_id == peer_id) {
try!(hop_msg.verify(info.public_id.signing_public_key()));
hop_name = *info.name();
} else if let Some(client_info) = self.peer_mgr.get_client(&peer_id) {
try!(hop_msg.verify(&client_info.public_key));
if client_info.client_restriction {
try!(self.check_valid_client_message(hop_msg.content().routing_message()));
}
hop_name = *self.name();
} else if let Some(pub_id) = self.peer_mgr.get_proxy_public_id(&peer_id) {
try!(hop_msg.verify(pub_id.signing_public_key()));
hop_name = *pub_id.name();
} else {
return Err(RoutingError::UnknownConnection(peer_id));
}
} else if self.state == State::Client {
if let Some(pub_id) = self.peer_mgr.get_proxy_public_id(&peer_id) {
try!(hop_msg.verify(pub_id.signing_public_key()));
hop_name = *pub_id.name();
} else {
return Err(RoutingError::UnknownConnection(peer_id));
}
} else {
return Err(RoutingError::InvalidStateForOperation);
}
self.handle_signed_message(hop_msg.content(),
hop_msg.route(),
&hop_name,
hop_msg.sent_to())
}
fn check_valid_client_message(&self, msg: &RoutingMessage) -> Result<(), RoutingError> {
match msg.content {
MessageContent::Ack(..) => Ok(()),
MessageContent::UserMessagePart { priority, .. } if priority >= DEFAULT_PRIORITY => {
Ok(())
}
_ => {
debug!("{:?} Illegitimate client message {:?}. Refusing to relay.",
self,
msg);
Err(RoutingError::RejectedClientMessage)
}
}
}
fn handle_signed_message(&mut self,
signed_msg: &SignedMessage,
route: u8,
hop_name: &XorName,
sent_to: &[XorName])
-> Result<(), RoutingError> {
try!(signed_msg.check_integrity());
let routing_msg = signed_msg.routing_message();
if (self.grp_msg_filter.contains(routing_msg) || !routing_msg.src.is_group()) &&
self.is_recipient(&routing_msg.dst) {
self.send_ack(routing_msg, route);
}
if self.signed_message_filter.insert(signed_msg) > GROUP_SIZE {
return Err(RoutingError::FilterCheckFailed);
}
if self.state == State::Node {
if self.routing_table.is_close(routing_msg.dst.name(), GROUP_SIZE) {
try!(self.signed_msg_security_check(&signed_msg));
}
if let Err(error) = self.send(signed_msg, route, hop_name, sent_to) {
debug!("{:?} Failed to send {:?}: {:?}", self, signed_msg, error);
}
} else if self.state != State::Client {
return Err(RoutingError::InvalidStateForOperation);
}
if self.signed_message_filter.count(signed_msg) == 1 &&
self.is_recipient(&routing_msg.dst) {
self.handle_routing_message(routing_msg, *signed_msg.public_id())
} else {
Ok(())
}
}
fn signed_msg_security_check(&self, signed_msg: &SignedMessage) -> Result<(), RoutingError> {
if let Authority::Client { ref client_key, .. } = signed_msg.routing_message().src {
if client_key != signed_msg.public_id().signing_public_key() {
return Err(RoutingError::FailedSignature);
};
}
Ok(())
}
fn handle_routing_message(&mut self,
routing_msg: &RoutingMessage,
public_id: PublicId)
-> Result<(), RoutingError> {
if routing_msg.src.is_group() {
if self.grp_msg_filter.contains(routing_msg) {
return Err(RoutingError::FilterCheckFailed);
}
if let Some(group_msg) = self.accumulate(routing_msg, &public_id) {
let _ = self.grp_msg_filter.insert(&group_msg);
let _ = self.grp_msg_filter.insert(&try!(routing_msg.to_grp_msg_hash()));
self.send_ack(&group_msg, 0);
self.dispatch_routing_message(&group_msg)
} else {
Ok(())
}
} else {
self.dispatch_routing_message(routing_msg)
}
}
fn accumulate(&mut self,
message: &RoutingMessage,
public_id: &PublicId)
-> Option<RoutingMessage> {
if self.state == State::Node {
let dynamic_quorum_size = self.dynamic_quorum_size();
self.message_accumulator.set_quorum_size(dynamic_quorum_size);
}
let key = *public_id.signing_public_key();
let hash_msg = if let Ok(hash_msg) = message.to_grp_msg_hash() {
hash_msg
} else {
error!("{:?} Failed to hash message {:?}", self, message);
return None;
};
if let MessageContent::GroupMessageHash(hash, _) = hash_msg.content {
if hash_msg != *message {
let _ = self.grp_msg_cache.insert(hash, message.clone());
}
if self.message_accumulator.add(hash_msg, key).is_some() {
self.grp_msg_cache.remove(&hash)
} else {
None
}
} else {
self.message_accumulator.add(hash_msg, key).map(|_| message.clone())
}
}
fn dynamic_quorum_size(&self) -> usize {
let network_size = self.routing_table.len() + 1;
if network_size >= GROUP_SIZE {
QUORUM_SIZE
} else {
cmp::max(network_size * QUORUM_SIZE / GROUP_SIZE,
network_size / 2 + 1)
}
}
fn dispatch_routing_message(&mut self,
routing_msg: &RoutingMessage)
-> Result<(), RoutingError> {
let msg_content = routing_msg.content.clone();
let msg_src = routing_msg.src.clone();
let msg_dst = routing_msg.dst.clone();
match msg_content {
MessageContent::Ack(..) => (),
_ => {
trace!("{:?} Got routing message {:?} from {:?} to {:?}.",
self,
msg_content,
msg_src,
msg_dst)
}
}
match (msg_content, msg_src, msg_dst) {
(MessageContent::GetNodeName { current_id, message_id },
Authority::Client { client_key, proxy_node_name, peer_id },
Authority::NaeManager(dst_name)) => {
self.handle_get_node_name_request(current_id,
client_key,
proxy_node_name,
dst_name,
peer_id,
message_id)
}
(MessageContent::ExpectCloseNode { expect_id, client_auth, message_id },
Authority::NaeManager(_),
Authority::NaeManager(_)) => {
self.handle_expect_close_node_request(expect_id, client_auth, message_id)
}
(MessageContent::GetCloseGroup(message_id), src, Authority::NaeManager(dst_name)) => {
self.handle_get_close_group_request(src, dst_name, message_id)
}
(MessageContent::ConnectionInfo { encrypted_connection_info, nonce_bytes, public_id },
src @ Authority::Client { .. },
Authority::ManagedNode(dst_name)) => {
self.handle_connection_info_from_client(encrypted_connection_info,
nonce_bytes,
src,
dst_name,
public_id)
}
(MessageContent::ConnectionInfo { encrypted_connection_info, nonce_bytes, public_id },
Authority::ManagedNode(src_name),
Authority::Client { .. }) |
(MessageContent::ConnectionInfo { encrypted_connection_info, nonce_bytes, public_id },
Authority::ManagedNode(src_name),
Authority::ManagedNode(_)) => {
self.handle_connection_info_from_node(encrypted_connection_info,
nonce_bytes,
src_name,
routing_msg.dst.clone(),
public_id)
}
(MessageContent::GetNodeNameResponse { relocated_id, close_group_ids, .. },
Authority::NodeManager(_),
dst) => self.handle_get_node_name_response(relocated_id, close_group_ids, dst),
(MessageContent::GetCloseGroupResponse { close_group_ids, .. },
Authority::ManagedNode(_),
dst) => self.handle_get_close_group_response(close_group_ids, dst),
(MessageContent::Ack(ack, _), _, _) => self.handle_ack_response(ack),
(MessageContent::UserMessagePart { hash, part_count, part_index, payload, .. },
src,
dst) => {
let event = match self.add_user_msg_part(hash, part_count, part_index, payload) {
Some(UserMessage::Request(request)) => {
self.stats.count_request(&request);
Event::Request {
request: request,
src: src,
dst: dst,
}
}
Some(UserMessage::Response(response)) => {
self.stats.count_response(&response);
Event::Response {
response: response,
src: src,
dst: dst,
}
}
None => return Ok(()),
};
let _ = self.event_sender.send(event);
Ok(())
}
_ => {
warn!("{:?} Unhandled message {:?}", self, routing_msg);
Err(RoutingError::BadAuthority)
}
}
}
fn handle_bootstrap_failed(&mut self) {
debug!("{:?} Failed to bootstrap.", self);
if self.state == State::Disconnected {
let _ = self.event_sender.send(Event::Terminate);
}
}
fn start_listening(&mut self) -> bool {
if !self.is_listening {
if let Err(error) = self.crust_service.start_listening_tcp() {
error!("{:?} Failed to start listening: {:?}", self, error);
} else {
info!("{:?} Running listener.", self);
self.is_listening = true;
}
}
self.is_listening
}
fn handle_lost_peer(&mut self, peer_id: PeerId) {
if peer_id == self.crust_service.id() {
error!("{:?} LostPeer fired with our crust peer id", self);
return;
}
debug!("{:?} Received LostPeer - {:?}", self, peer_id);
if self.role != Role::Client {
self.dropped_tunnel_client(&peer_id);
self.dropped_routing_node_connection(&peer_id);
self.dropped_client_connection(&peer_id);
self.dropped_tunnel_node(&peer_id);
}
self.dropped_bootstrap_connection(&peer_id);
}
fn bootstrap_identify(&mut self, peer_id: PeerId) -> Result<(), RoutingError> {
let direct_message = DirectMessage::BootstrapIdentify {
public_id: *self.full_id.public_id(),
current_quorum_size: self.dynamic_quorum_size(),
};
self.send_direct_message(&peer_id, direct_message)
}
fn client_identify(&mut self, peer_id: PeerId) -> Result<(), RoutingError> {
debug!("{:?} - Sending ClientIdentify to {:?}.", self, peer_id);
let token = self.timer.schedule(Duration::from_secs(BOOTSTRAP_TIMEOUT_SECS));
self.state = State::Bootstrapping(peer_id, token);
let serialised_public_id = try!(serialisation::serialise(self.full_id.public_id()));
let signature = sign::sign_detached(&serialised_public_id,
self.full_id.signing_private_key());
let direct_message = DirectMessage::ClientIdentify {
serialised_public_id: serialised_public_id,
signature: signature,
client_restriction: self.role == Role::Client,
};
self.send_direct_message(&peer_id, direct_message)
}
fn node_identify(&mut self, peer_id: PeerId) -> Result<(), RoutingError> {
let serialised_public_id = try!(serialisation::serialise(self.full_id.public_id()));
let signature = sign::sign_detached(&serialised_public_id,
self.full_id.signing_private_key());
let direct_message = DirectMessage::NodeIdentify {
serialised_public_id: serialised_public_id,
signature: signature,
};
self.send_direct_message(&peer_id, direct_message)
}
fn send_direct_message(&mut self,
dst_id: &PeerId,
direct_message: DirectMessage)
-> Result<(), RoutingError> {
self.stats.count_direct_message(&direct_message);
let priority = direct_message.priority();
let (message, peer_id) = if let Some(&tunnel_id) = self.tunnels.tunnel_for(dst_id) {
let message = Message::TunnelDirect {
content: direct_message,
src: self.crust_service.id(),
dst: *dst_id,
};
(message, tunnel_id)
} else {
(Message::Direct(direct_message), *dst_id)
};
let raw_bytes = match serialisation::serialise(&message) {
Err(error) => {
error!("{:?} Failed to serialise message {:?}: {:?}",
self,
message,
error);
return Err(error.into());
}
Ok(bytes) => bytes,
};
self.send_or_drop(&peer_id, raw_bytes, priority)
}
fn send_or_drop(&mut self,
peer_id: &PeerId,
bytes: Vec<u8>,
priority: u8)
-> Result<(), RoutingError> {
self.stats.count_bytes(bytes.len());
if let Err(err) = self.crust_service.send(*peer_id, bytes.clone(), priority) {
info!("{:?} Connection to {:?} failed. Calling crust::Service::disconnect.",
self,
peer_id);
self.crust_service.disconnect(*peer_id);
self.handle_lost_peer(*peer_id);
return Err(err.into());
}
Ok(())
}
fn filter_signed_msg(&mut self, msg: &SignedMessage, peer_id: &PeerId, route: u8) -> bool {
let hash = maidsafe_utilities::big_endian_sip_hash(msg);
if self.send_filter.insert((hash, *peer_id, route), ()).is_some() {
return true;
}
self.stats.count_routing_message(msg.routing_message());
false
}
fn verify_signed_public_id(serialised_public_id: &[u8],
signature: &sign::Signature)
-> Result<PublicId, RoutingError> {
let public_id: PublicId = try!(serialisation::deserialise(serialised_public_id));
let public_key = public_id.signing_public_key();
if sign::verify_detached(signature, serialised_public_id, public_key) {
Ok(public_id)
} else {
Err(RoutingError::FailedSignature)
}
}
fn handle_direct_message(&mut self,
direct_message: DirectMessage,
peer_id: PeerId)
-> Result<(), RoutingError> {
match direct_message {
DirectMessage::BootstrapIdentify { public_id, current_quorum_size } => {
self.handle_bootstrap_identify(public_id, peer_id, current_quorum_size)
}
DirectMessage::BootstrapDeny => {
info!("{:?} Connection failed: Proxy node needs a larger routing table to accept \
clients.",
self);
self.rebootstrap();
Ok(())
}
DirectMessage::ClientToNode => {
if self.peer_mgr.remove_client(&peer_id).is_none() {
warn!("{:?} Client requested ClientToNode, but is not in client map: {:?}",
self,
peer_id);
}
if self.routing_table.iter().all(|node| node.peer_id != peer_id) {
warn!("{:?} Client requested ClientToNode, but is not in routing table: {:?}",
self,
peer_id);
self.disconnect_peer(&peer_id);
}
Ok(())
}
DirectMessage::ClientIdentify { ref serialised_public_id,
ref signature,
client_restriction } => {
if let Ok(public_id) = Core::verify_signed_public_id(serialised_public_id,
signature) {
self.handle_client_identify(public_id, peer_id, client_restriction)
} else {
warn!("{:?} Signature check failed in ClientIdentify - \
Dropping connection {:?}",
self,
peer_id);
self.disconnect_peer(&peer_id);
Ok(())
}
}
DirectMessage::NodeIdentify { ref serialised_public_id, ref signature } => {
if let Ok(public_id) = Core::verify_signed_public_id(serialised_public_id,
signature) {
self.handle_node_identify(public_id, peer_id);
} else {
warn!("{:?} Signature check failed in NodeIdentify - Dropping peer {:?}",
self,
peer_id);
self.disconnect_peer(&peer_id);
}
Ok(())
}
DirectMessage::NewNode(public_id) => {
trace!("{:?} Received NewNode({:?}).", self, public_id);
if self.routing_table.need_to_add(public_id.name()) {
let our_name = *self.name();
return self.send_connection_info(public_id,
Authority::ManagedNode(our_name),
Authority::ManagedNode(*public_id.name()));
}
Ok(())
}
DirectMessage::ConnectionUnneeded(ref name) => {
if let Some(node_info) = self.routing_table.get(name) {
if node_info.peer_id != peer_id {
debug!("{:?} Received ConnectionUnneeded from {:?} with name {:?}, but \
that name actually belongs to {:?}.",
self,
peer_id,
name,
node_info.peer_id);
return Err(RoutingError::InvalidSource);
}
}
debug!("{:?} Received ConnectionUnneeded from {:?}.", self, peer_id);
if self.routing_table.remove_if_unneeded(name) {
info!("{:?} Dropped {:?} from the routing table.", self, name);
self.crust_service.disconnect(peer_id);
self.handle_lost_peer(peer_id);
}
Ok(())
}
DirectMessage::TunnelRequest(dst_id) => self.handle_tunnel_request(peer_id, dst_id),
DirectMessage::TunnelSuccess(dst_id) => self.handle_tunnel_success(peer_id, dst_id),
DirectMessage::TunnelClosed(dst_id) => self.handle_tunnel_closed(peer_id, dst_id),
DirectMessage::TunnelDisconnect(dst_id) => {
self.handle_tunnel_disconnect(peer_id, dst_id)
}
}
}
fn handle_bootstrap_identify(&mut self,
public_id: PublicId,
peer_id: PeerId,
current_quorum_size: usize)
-> Result<(), RoutingError> {
if *public_id.name() == XorName(sha256::hash(&public_id.signing_public_key().0).0) {
warn!("{:?} Incoming Connection not validated as a proper node - dropping",
self);
self.rebootstrap();
return Ok(());
}
if !self.peer_mgr.set_proxy(peer_id, public_id) {
self.disconnect_peer(&peer_id);
return Ok(());
}
self.state = State::Client;
debug!("{:?} - State changed to client, quorum size: {}.",
self,
current_quorum_size);
self.message_accumulator.set_quorum_size(current_quorum_size);
match self.role {
Role::Client => {
let _ = self.event_sender.send(Event::Connected);
}
Role::Node => {
let _ = self.start_listening();
}
Role::FirstNode => debug!("{:?} Received BootstrapIdentify as the first node.", self),
};
Ok(())
}
fn handle_client_identify(&mut self,
public_id: PublicId,
peer_id: PeerId,
client_restriction: bool)
-> Result<(), RoutingError> {
if *public_id.name() != XorName(sha256::hash(&public_id.signing_public_key().0).0) {
warn!("{:?} Incoming Connection not validated as a proper client - dropping",
self);
self.disconnect_peer(&peer_id);
return Ok(());
}
for peer_id in self.peer_mgr.remove_stale_joining_nodes() {
debug!("{:?} Removing stale joining node with Crust ID {:?}",
self,
peer_id);
self.disconnect_peer(&peer_id);
}
if (client_restriction || self.role != Role::FirstNode) &&
self.routing_table.len() < GROUP_SIZE - 1 {
debug!("{:?} Client {:?} rejected: Routing table has {} entries. {} required.",
self,
public_id.name(),
self.routing_table.len(),
GROUP_SIZE - 1);
return self.send_direct_message(&peer_id, DirectMessage::BootstrapDeny);
}
if self.peer_mgr.get_client(&peer_id).is_some() {
debug!("{:?} Received two ClientInfo from the same peer ID {:?}.",
self,
peer_id);
}
self.peer_mgr.insert_client(peer_id, &public_id, client_restriction);
debug!("{:?} Accepted client {:?}.", self, public_id.name());
self.bootstrap_identify(peer_id)
}
fn handle_node_identify(&mut self, public_id: PublicId, peer_id: PeerId) {
if self.role == Role::Client {
debug!("{:?} Received node identify as a client.", self);
return;
}
debug!("{:?} Handling NodeIdentify from {:?}.",
self,
public_id.name());
if let Some((name, _)) = self.sent_network_name_to {
if name == *public_id.name() {
self.sent_network_name_to = None;
}
}
self.add_to_routing_table(public_id, peer_id);
}
fn add_to_routing_table(&mut self, public_id: PublicId, peer_id: PeerId) {
let name = *public_id.name();
if self.routing_table.contains(&name) {
return; }
let info = NodeInfo::new(public_id, peer_id);
let bucket_index = self.name().bucket_index(&name);
let common_groups = self.routing_table.is_in_any_close_group_with(bucket_index, GROUP_SIZE);
match self.routing_table.add(info) {
None => {
debug!("{:?} Peer was not added to the routing table: {:?}",
self,
peer_id);
self.disconnect_peer(&peer_id);
}
Some(AddedNodeDetails { must_notify, unneeded }) => {
info!("{:?} Added {:?} to routing table.", self, name);
if self.routing_table.len() == 1 {
let _ = self.event_sender.send(Event::Connected);
}
for notify_info in must_notify {
let message = DirectMessage::NewNode(public_id);
let _ = self.send_direct_message(¬ify_info.peer_id, message);
}
for node_info in unneeded {
let message = DirectMessage::ConnectionUnneeded(*self.name());
let _ = self.send_direct_message(&node_info.peer_id, message);
}
self.reset_bucket_refresh_timer();
if common_groups {
let event = Event::NodeAdded(name, self.routing_table.to_names());
if let Err(err) = self.event_sender.send(event) {
error!("{:?} Error sending event to routing user - {:?}", self, err);
}
}
}
}
if self.state != State::Node {
self.state = State::Node;
let tick_period = Duration::from_secs(TICK_TIMEOUT_SECS);
self.tick_timer_token = Some(self.timer.schedule(tick_period));
}
if self.routing_table.len() == 1 {
self.request_bucket_close_groups();
}
for (dst_id, _) in self.peer_mgr.peers_with_state(ConnectState::Tunnel) {
let tunnel_request = DirectMessage::TunnelRequest(dst_id);
let _ = self.send_direct_message(&peer_id, tunnel_request);
}
}
fn reset_bucket_refresh_timer(&mut self) {
if let Some((_, REFRESH_BUCKET_GROUPS_SECS)) = self.bucket_refresh_token_and_delay {
return; }
let new_token = self.timer.schedule(Duration::from_secs(REFRESH_BUCKET_GROUPS_SECS));
self.bucket_refresh_token_and_delay = Some((new_token, REFRESH_BUCKET_GROUPS_SECS));
}
fn request_bucket_ids(&mut self, bucket_index: usize) -> Result<(), RoutingError> {
if bucket_index >= XOR_NAME_BITS {
return Ok(());
}
trace!("{:?} Send GetCloseGroup to bucket {}.", self, bucket_index);
let bucket_address = self.name().with_flipped_bit(bucket_index);
self.request_close_group(bucket_address)
}
fn request_close_group(&mut self, name: XorName) -> Result<(), RoutingError> {
let request_msg = RoutingMessage {
src: Authority::ManagedNode(*self.name()),
dst: Authority::NaeManager(name),
content: MessageContent::GetCloseGroup(MessageId::new()),
};
self.send_message(request_msg)
}
fn handle_tunnel_request(&mut self,
peer_id: PeerId,
dst_id: PeerId)
-> Result<(), RoutingError> {
if self.routing_table.iter().any(|node| node.peer_id == peer_id) &&
self.routing_table.iter().any(|node| node.peer_id == dst_id) {
if let Some((id0, id1)) = self.tunnels.consider_clients(peer_id, dst_id) {
debug!("{:?} Accepted tunnel request from {:?} for {:?}.",
self,
peer_id,
dst_id);
return self.send_direct_message(&id0, DirectMessage::TunnelSuccess(id1));
}
} else {
debug!("{:?} Rejected tunnel request from {:?} for {:?}.",
self,
peer_id,
dst_id);
}
Ok(())
}
fn handle_tunnel_success(&mut self,
peer_id: PeerId,
dst_id: PeerId)
-> Result<(), RoutingError> {
if let Some((name, _)) = self.peer_mgr.remove_connecting_peer(&dst_id) {
if self.tunnels.add(dst_id, peer_id) {
debug!("{:?} Adding {:?} as a tunnel node for {:?}.",
self,
peer_id,
name);
return self.node_identify(dst_id);
}
}
Ok(())
}
fn handle_tunnel_closed(&mut self,
peer_id: PeerId,
dst_id: PeerId)
-> Result<(), RoutingError> {
if self.tunnels.remove(dst_id, peer_id) {
debug!("{:?} Tunnel to {:?} via {:?} closed.",
self,
dst_id,
peer_id);
if !self.crust_service.is_connected(&dst_id) {
self.dropped_routing_node_connection(&dst_id);
}
}
Ok(())
}
fn handle_tunnel_disconnect(&mut self,
peer_id: PeerId,
dst_id: PeerId)
-> Result<(), RoutingError> {
debug!("{:?} Closing tunnel connecting {:?} and {:?}.",
self,
dst_id,
peer_id);
if self.tunnels.drop_client_pair(dst_id, peer_id) {
self.send_direct_message(&dst_id, DirectMessage::TunnelClosed(peer_id))
} else {
Ok(())
}
}
fn disconnect_peer(&mut self, peer_id: &PeerId) {
if let Some(&node) = self.routing_table.iter().find(|node| node.peer_id == *peer_id) {
debug!("{:?} Not disconnecting routing table entry {:?} ({:?}).",
self,
node.name(),
peer_id);
} else if let Some(&public_id) = self.peer_mgr.get_proxy_public_id(peer_id) {
debug!("{:?} Not disconnecting proxy node {:?} ({:?}).",
self,
public_id.name(),
peer_id);
} else if self.peer_mgr.get_client(peer_id).is_some() {
debug!("{:?} Not disconnecting client {:?}.", self, peer_id);
} else if let Some(tunnel_id) = self.tunnels.remove_tunnel_for(peer_id) {
debug!("{:?} Disconnecting {:?} (indirect).", self, peer_id);
let message = DirectMessage::TunnelDisconnect(*peer_id);
let _ = self.send_direct_message(&tunnel_id, message);
} else {
debug!("{:?} Disconnecting {:?}. Calling crust::Service::disconnect.",
self,
peer_id);
let _ = self.crust_service.disconnect(*peer_id);
}
}
fn relocate(&mut self) -> Result<(), RoutingError> {
let duration = Duration::from_secs(GET_NODE_NAME_TIMEOUT_SECS);
self.get_node_name_timer_token = Some(self.timer.schedule(duration));
let request_content = MessageContent::GetNodeName {
current_id: *self.full_id.public_id(),
message_id: MessageId::new(),
};
let request_msg = RoutingMessage {
src: try!(self.get_client_authority()),
dst: Authority::NaeManager(*self.name()),
content: request_content,
};
info!("{:?} Sending GetNodeName request with: {:?}. This can take a while.",
self,
self.full_id.public_id());
self.send_message(request_msg)
}
fn handle_get_node_name_request(&mut self,
mut their_public_id: PublicId,
client_key: sign::PublicKey,
proxy_name: XorName,
dst_name: XorName,
peer_id: PeerId,
message_id: MessageId)
-> Result<(), RoutingError> {
let hashed_key = sha256::hash(&client_key.0);
let close_group_to_client = XorName(hashed_key.0);
if close_group_to_client != dst_name {
return Err(RoutingError::InvalidDestination);
}
let close_group = match self.routing_table.close_nodes(&dst_name, GROUP_SIZE) {
Some(close_group) => close_group.iter().map(NodeInfo::name).cloned().collect(),
None => return Err(RoutingError::InvalidDestination),
};
let relocated_name = try!(utils::calculate_relocated_name(close_group,
&their_public_id.name()));
their_public_id.set_name(relocated_name);
{
let request_content = MessageContent::ExpectCloseNode {
expect_id: their_public_id,
client_auth: Authority::Client {
client_key: client_key,
proxy_node_name: proxy_name,
peer_id: peer_id,
},
message_id: message_id,
};
let request_msg = RoutingMessage {
src: Authority::NaeManager(dst_name),
dst: Authority::NaeManager(relocated_name),
content: request_content,
};
self.send_message(request_msg)
}
}
fn handle_expect_close_node_request(&mut self,
expect_id: PublicId,
client_auth: Authority,
message_id: MessageId)
-> Result<(), RoutingError> {
if expect_id == *self.full_id.public_id() {
return Ok(());
}
let now = Instant::now();
if let Some((_, timestamp)) = self.sent_network_name_to {
if (now - timestamp).as_secs() <= SENT_NETWORK_NAME_TIMEOUT_SECS {
return Ok(()); }
self.sent_network_name_to = None;
}
let close_group = match self.routing_table.close_nodes(expect_id.name(), GROUP_SIZE) {
Some(close_group) => close_group,
None => return Err(RoutingError::InvalidDestination),
};
let public_ids = close_group.into_iter().map(|info| info.public_id).collect_vec();
self.sent_network_name_to = Some((*expect_id.name(), now));
let response_content = MessageContent::GetNodeNameResponse {
relocated_id: expect_id,
close_group_ids: public_ids,
message_id: message_id,
};
debug!("{:?} Responding to client {:?}: {:?}.",
self,
client_auth,
response_content);
let response_msg = RoutingMessage {
src: Authority::NodeManager(*expect_id.name()),
dst: client_auth,
content: response_content,
};
self.send_message(response_msg)
}
fn handle_get_node_name_response(&mut self,
relocated_id: PublicId,
mut close_group_ids: Vec<PublicId>,
dst: Authority)
-> Result<(), RoutingError> {
self.get_node_name_timer_token = None;
self.set_self_node_name(*relocated_id.name());
close_group_ids.truncate(GROUP_SIZE / 2);
for close_node_id in close_group_ids {
debug!("{:?} Sending connection info to {:?} on GetNodeName response.",
self,
close_node_id);
try!(self.send_connection_info(close_node_id,
dst.clone(),
Authority::ManagedNode(*close_node_id.name())));
}
Ok(())
}
fn handle_get_close_group_request(&mut self,
src: Authority,
dst_name: XorName,
message_id: MessageId)
-> Result<(), RoutingError> {
if src.name() == self.name() {
return Ok(());
}
let close_group = match self.routing_table.close_nodes(&dst_name, GROUP_SIZE) {
Some(close_group) => close_group,
None => return Err(RoutingError::InvalidDestination),
};
let public_ids = close_group.into_iter().map(|info| info.public_id).collect_vec();
trace!("{:?} Sending GetCloseGroup response with {:?} to client {:?}.",
self,
public_ids.iter().map(PublicId::name).collect_vec(),
src);
let response_content = MessageContent::GetCloseGroupResponse {
close_group_ids: public_ids,
message_id: message_id,
};
let response_msg = RoutingMessage {
src: Authority::ManagedNode(*self.name()),
dst: src,
content: response_content,
};
self.send_message(response_msg)
}
fn handle_get_close_group_response(&mut self,
close_group_ids: Vec<PublicId>,
dst: Authority)
-> Result<(), RoutingError> {
for close_node_id in close_group_ids {
if self.routing_table.need_to_add(close_node_id.name()) {
debug!("{:?} Sending connection info to {:?} on GetCloseGroup response.",
self,
close_node_id);
let ci_dst = Authority::ManagedNode(*close_node_id.name());
try!(self.send_connection_info(close_node_id, dst.clone(), ci_dst));
}
}
Ok(())
}
fn handle_ack_response(&mut self, ack: u64) -> Result<(), RoutingError> {
if self.pending_acks.remove(&ack).is_none() {
let _ = self.received_acks.insert(&ack);
}
Ok(())
}
fn handle_connection_info_from_client(&mut self,
encrypted_connection_info: Vec<u8>,
nonce_bytes: [u8; box_::NONCEBYTES],
src: Authority,
dst_name: XorName,
their_public_id: PublicId)
-> Result<(), RoutingError> {
try!(self.check_address_for_routing_table(their_public_id.name()));
self.connect(encrypted_connection_info,
nonce_bytes,
their_public_id,
Authority::ManagedNode(dst_name),
src)
}
fn handle_connection_info_from_node(&mut self,
encrypted_connection_info: Vec<u8>,
nonce_bytes: [u8; box_::NONCEBYTES],
src_name: XorName,
dst: Authority,
their_public_id: PublicId)
-> Result<(), RoutingError> {
try!(self.check_address_for_routing_table(&src_name));
self.connect(encrypted_connection_info,
nonce_bytes,
their_public_id,
dst,
Authority::ManagedNode(src_name))
}
fn send_connection_info(&mut self,
their_public_id: PublicId,
src: Authority,
dst: Authority)
-> Result<(), RoutingError> {
if let Some(peer_id) = self.peer_mgr.get_proxy_or_client_peer_id(&their_public_id) {
try!(self.node_identify(peer_id));
self.handle_node_identify(their_public_id, peer_id);
} else if !self.routing_table.contains(their_public_id.name()) &&
self.routing_table.allow_connection(their_public_id.name()) {
if self.peer_mgr
.connection_token_map
.peek_iter()
.any(|(_, &(ref public_id, _, _))| *public_id == their_public_id) ||
self.peer_mgr.our_connection_info_map.contains_key(&their_public_id) ||
self.peer_mgr.connecting_peer_state(their_public_id.name()) ==
Some(ConnectState::Crust) {
debug!("{:?} Already sent connection info to {:?}!",
self,
their_public_id.name());
} else {
let token = rand::random();
self.crust_service.prepare_connection_info(token);
let _ =
self.peer_mgr.connection_token_map.insert(token, (their_public_id, src, dst));
}
}
Ok(())
}
fn handle_timeout(&mut self, token: u64) {
if let State::Bootstrapping(peer_id, bootstrap_token) = self.state {
if bootstrap_token == token {
debug!("{:?} Timeout when trying to bootstrap against {:?}.",
self,
peer_id);
self.rebootstrap();
return;
}
}
if self.get_node_name_timer_token == Some(token) {
info!("{:?} Failed to get GetNodeName response.", self);
let _ = self.event_sender.send(Event::RestartRequired);
return;
}
if self.tick_timer_token == Some(token) {
let _ = self.event_sender.send(Event::Tick);
let tick_period = Duration::from_secs(TICK_TIMEOUT_SECS);
self.tick_timer_token = Some(self.timer.schedule(tick_period));
return;
}
if let Some((bucket_token, delay)) = self.bucket_refresh_token_and_delay {
if bucket_token == token {
self.request_bucket_close_groups();
let new_delay = delay.saturating_mul(2);
let new_token = self.timer.schedule(Duration::from_secs(new_delay));
self.bucket_refresh_token_and_delay = Some((new_token, new_delay));
return;
}
}
let timed_out_ack = if let Some((sip_hash, _)) = self.pending_acks
.iter()
.find(|&(_, ref unacked_msg)| unacked_msg.timer_token == token) {
Some(*sip_hash)
} else {
None
};
if let Some(timed_out) = timed_out_ack {
let mut unacked_msg = self.pending_acks.remove(&timed_out).expect("Bug in HashMap.");
trace!("{:?} - Timed out waiting for ack({}) {:?}",
self,
timed_out,
unacked_msg);
unacked_msg.route += 1;
if unacked_msg.route as usize == GROUP_SIZE {
debug!("{:?} - Message unable to be acknowledged - giving up. {:?}",
self,
unacked_msg);
self.stats.count_unacked();
} else {
let hop = *self.name();
let _ = self.send(&unacked_msg.signed_msg, unacked_msg.route, &hop, &[hop]);
}
}
}
fn request_bucket_close_groups(&mut self) {
if !self.bucket_filter.contains(&XOR_NAME_BITS) {
let _ = self.bucket_filter.insert(&XOR_NAME_BITS);
let our_name = *self.name();
if let Err(err) = self.request_close_group(our_name) {
error!("{:?} Failed to request our own close group: {:?}",
self,
err);
}
}
for index in 0..self.routing_table.bucket_count() {
if self.routing_table.bucket_len(index) < GROUP_SIZE &&
!self.bucket_filter.contains(&index) {
let _ = self.bucket_filter.insert(&index);
if let Err(err) = self.request_bucket_ids(index) {
error!("{:?} Failed to request public IDs from bucket {}: {:?}.",
self,
index,
err);
}
}
}
}
fn connect(&mut self,
encrypted_connection_info: Vec<u8>,
nonce_bytes: [u8; box_::NONCEBYTES],
their_public_id: PublicId,
src: Authority,
dst: Authority)
-> Result<(), RoutingError> {
let decipher_result = box_::open(&encrypted_connection_info,
&box_::Nonce(nonce_bytes),
their_public_id.encrypting_public_key(),
self.full_id.encrypting_private_key());
let serialised_connection_info =
try!(decipher_result.map_err(|()| RoutingError::AsymmetricDecryptionFailure));
let their_connection_info: PubConnectionInfo =
try!(serialisation::deserialise(&serialised_connection_info));
if let Some(our_connection_info) = self.peer_mgr
.our_connection_info_map
.remove(&their_public_id) {
let peer_id = their_connection_info.id();
let their_name = *their_public_id.name();
if let Some((name, _)) = self.peer_mgr
.insert_connecting_peer(peer_id, their_name, ConnectState::Crust) {
warn!("{:?} Prepared connection info for {:?} as {:?}, but already tried as {:?}.",
self,
peer_id,
their_name,
name);
}
debug!("{:?} Received connection info. Trying to connect to {:?} as {:?}.",
self,
peer_id,
their_public_id.name());
let _ = self.crust_service.connect(our_connection_info, their_connection_info);
Ok(())
} else {
let _ = self.peer_mgr
.their_connection_info_map
.insert(their_public_id, their_connection_info);
self.send_connection_info(their_public_id, src, dst)
}
}
fn send_user_message(&mut self,
src: Authority,
dst: Authority,
user_msg: UserMessage,
priority: u8)
-> Result<(), RoutingError> {
match user_msg {
UserMessage::Request(ref request) => self.stats.count_request(request),
UserMessage::Response(ref response) => self.stats.count_response(response),
}
for part in try!(user_msg.to_parts(priority)) {
try!(self.send_message(RoutingMessage {
src: src.clone(),
dst: dst.clone(),
content: part,
}));
}
Ok(())
}
fn add_user_msg_part(&mut self,
hash: u64,
part_count: u32,
part_index: u32,
payload: Vec<u8>)
-> Option<UserMessage> {
{
let entry = self.user_msg_cache.entry((hash, part_count)).or_insert_with(BTreeMap::new);
let _ = entry.insert(part_index, payload);
if entry.len() != part_count as usize {
return None;
}
}
self.user_msg_cache
.remove(&(hash, part_count))
.and_then(|part_map| UserMessage::from_parts(hash, part_map.values()).ok())
}
fn send_message(&mut self, routing_msg: RoutingMessage) -> Result<(), RoutingError> {
self.send_message_via_route(routing_msg, 0)
}
fn send_message_via_route(&mut self,
routing_msg: RoutingMessage,
route: u8)
-> Result<(), RoutingError> {
let signed_msg = try!(SignedMessage::new(routing_msg, &self.full_id));
let hop = *self.name();
try!(self.send(&signed_msg, route, &hop, &[hop]));
let sent_msg = try!(self.message_to_send(&signed_msg, route, &hop));
if self.is_recipient(&sent_msg.routing_message().dst) &&
self.signed_message_filter.insert(&sent_msg) == 1 {
self.handle_routing_message(sent_msg.routing_message(), *sent_msg.public_id())
} else {
Ok(())
}
}
fn relay_to_client(&mut self,
signed_msg: SignedMessage,
peer_id: &PeerId)
-> Result<(), RoutingError> {
let priority = signed_msg.priority();
if self.peer_mgr.get_client(peer_id).is_some() {
if self.filter_signed_msg(&signed_msg, peer_id, 0) {
return Ok(());
}
let hop_msg =
try!(HopMessage::new(signed_msg, 0, vec![], self.full_id.signing_private_key()));
let message = Message::Hop(hop_msg);
let raw_bytes = try!(serialisation::serialise(&message));
self.send_or_drop(peer_id, raw_bytes, priority)
} else {
let hop = *self.name();
self.send_ack_from(signed_msg.routing_message(), 0, Authority::ManagedNode(hop));
debug!("{:?} Client connection not found for message {:?}.",
self,
signed_msg);
Err(RoutingError::ClientConnectionNotFound)
}
}
fn to_hop_bytes(&self,
signed_msg: SignedMessage,
route: u8,
sent_to: Vec<XorName>)
-> Result<Vec<u8>, RoutingError> {
let hop_msg = try!(HopMessage::new(signed_msg,
route,
sent_to,
self.full_id.signing_private_key()));
let message = Message::Hop(hop_msg);
Ok(try!(serialisation::serialise(&message)))
}
fn to_tunnel_hop_bytes(&self,
signed_msg: SignedMessage,
route: u8,
sent_to: Vec<XorName>,
src: PeerId,
dst: PeerId)
-> Result<Vec<u8>, RoutingError> {
let hop_msg = try!(HopMessage::new(signed_msg.clone(),
route,
sent_to,
self.full_id.signing_private_key()));
let message = Message::TunnelHop {
content: hop_msg,
src: src,
dst: dst,
};
Ok(try!(serialisation::serialise(&message)))
}
fn send(&mut self,
signed_msg: &SignedMessage,
route: u8,
hop: &XorName,
sent_to: &[XorName])
-> Result<(), RoutingError> {
if signed_msg.public_id() == self.full_id.public_id() && hop == self.name() {
self.stats.count_route(route);
}
let routing_msg = signed_msg.routing_message();
if let Authority::Client { ref peer_id, .. } = routing_msg.dst {
if self.name() == routing_msg.dst.name() {
return self.relay_to_client(signed_msg.clone(), peer_id);
} else if self.is_recipient(&routing_msg.dst) {
return Ok(()); }
}
let (new_sent_to, target_peer_ids) =
try!(self.get_targets(routing_msg, route, hop, sent_to));
if !self.add_to_pending_acks(signed_msg, route) {
return Ok(());
}
let send_msg = try!(self.message_to_send(signed_msg, route, hop));
let raw_bytes = try!(self.to_hop_bytes(send_msg.clone(), route, new_sent_to.clone()));
for target_peer_id in target_peer_ids {
let (peer_id, bytes) = if self.crust_service.is_connected(&target_peer_id) {
(target_peer_id, raw_bytes.clone())
} else if let Some(&tunnel_id) = self.tunnels
.tunnel_for(&target_peer_id) {
let bytes = try!(self.to_tunnel_hop_bytes(send_msg.clone(),
route,
new_sent_to.clone(),
self.crust_service.id(),
target_peer_id));
(tunnel_id, bytes)
} else {
trace!("{:?} Not connected or tunneling to {:?}. Dropping peer.",
self,
target_peer_id);
self.disconnect_peer(&target_peer_id);
continue;
};
if !self.filter_signed_msg(signed_msg, &target_peer_id, route) {
if let Err(err) = self.send_or_drop(&peer_id, bytes, signed_msg.priority()) {
info!("{:?} Error sending message to {:?}: {:?}.",
self,
target_peer_id,
err);
}
}
}
Ok(())
}
fn message_to_send(&self,
signed_msg: &SignedMessage,
route: u8,
hop: &XorName)
-> Result<SignedMessage, RoutingError> {
let src = &signed_msg.routing_message().src;
if signed_msg.public_id() != self.full_id.public_id() || hop != self.name() ||
!src.is_group() {
return Ok(signed_msg.clone());
}
let group = self.routing_table.closest_nodes_to(src.name(), GROUP_SIZE, true);
if hop == group[route as usize % (group.len())].name() {
return Ok(signed_msg.clone());
}
SignedMessage::new(try!(signed_msg.routing_message().to_grp_msg_hash()),
&self.full_id)
}
fn is_recipient(&self, dst: &Authority) -> bool {
if let Authority::Client { ref client_key, .. } = *dst {
(self.state == State::Node || self.state == State::Client) &&
client_key == self.full_id.public_id().signing_public_key()
} else {
self.state == State::Node && self.routing_table.is_recipient(dst.to_destination())
}
}
fn get_targets(&self,
routing_msg: &RoutingMessage,
route: u8,
hop: &XorName,
sent_to: &[XorName])
-> Result<(Vec<XorName>, Vec<PeerId>), RoutingError> {
match self.state {
State::Disconnected |
State::Bootstrapping(..) => {
error!("{:?} - Tried to send message in state {:?}",
self,
self.state);
Err(RoutingError::NotBootstrapped)
}
State::Client => {
if let Authority::Client { ref proxy_node_name, .. } = routing_msg.src {
if let Some(&peer_id) = self.peer_mgr.get_proxy_peer_id(proxy_node_name) {
Ok((vec![], vec![peer_id]))
} else {
error!("{:?} - Unable to find connection to proxy node in proxy map",
self);
Err(RoutingError::ProxyConnectionNotFound)
}
} else {
error!("{:?} - Source should be client if our state is a Client",
self);
Err(RoutingError::InvalidSource)
}
}
State::Node => {
let destination = routing_msg.dst.to_destination();
let targets = self.routing_table
.target_nodes(destination, hop, route as usize)
.into_iter()
.filter(|target| !sent_to.contains(target.name()))
.collect_vec();
let new_sent_to = sent_to.iter()
.chain(targets.iter().map(NodeInfo::name))
.cloned()
.collect_vec();
Ok((new_sent_to, targets.into_iter().map(|target| target.peer_id).collect()))
}
}
}
fn send_ack(&mut self, routing_msg: &RoutingMessage, route: u8) {
self.send_ack_from(routing_msg, route, routing_msg.dst.clone());
}
fn send_ack_from(&mut self, routing_msg: &RoutingMessage, route: u8, src: Authority) {
if let MessageContent::Ack(..) = routing_msg.content {
return;
}
let hash_msg = match routing_msg.to_grp_msg_hash() {
Ok(hash_msg) => hash_msg,
Err(error) => {
error!("{:?} Failed to create hash message: {:?}", self, error);
return;
}
};
let hash = maidsafe_utilities::big_endian_sip_hash(&hash_msg);
let response = RoutingMessage {
src: src,
dst: routing_msg.src.clone(),
content: MessageContent::Ack(hash, routing_msg.priority()),
};
if let Err(error) = self.send_message_via_route(response, route) {
error!("{:?} Failed to send ack: {:?}", self, error);
}
}
fn add_to_pending_acks(&mut self, signed_msg: &SignedMessage, route: u8) -> bool {
if let MessageContent::Ack(..) = signed_msg.routing_message().content {
return true;
}
if *signed_msg.public_id() != *self.full_id.public_id() {
return true;
}
let hash_msg = match signed_msg.routing_message().to_grp_msg_hash() {
Ok(hash_msg) => hash_msg,
Err(error) => {
error!("{:?} Failed to create hash message: {:?}", self, error);
return true;
}
};
let ack = maidsafe_utilities::big_endian_sip_hash(&hash_msg);
if self.received_acks.contains(&ack) {
return false;
}
let token = self.timer.schedule(Duration::from_secs(ACK_TIMEOUT_SECS));
let unacked_msg = UnacknowledgedMessage {
signed_msg: signed_msg.clone(),
route: route,
timer_token: token,
};
if let Some(ejected) = self.pending_acks.insert(ack, unacked_msg) {
trace!("{:?} Ejected pending ack: {:?} - {:?}", self, ack, ejected);
}
true
}
fn get_client_authority(&self) -> Result<Authority, RoutingError> {
match *self.peer_mgr.proxy() {
Some((_, ref bootstrap_pub_id)) => {
Ok(Authority::Client {
client_key: *self.full_id.public_id().signing_public_key(),
proxy_node_name: *bootstrap_pub_id.name(),
peer_id: self.crust_service.id(),
})
}
None => Err(RoutingError::NotBootstrapped),
}
}
fn set_self_node_name(&mut self, new_name: XorName) {
assert!(XorName(sha256::hash(&self.full_id.public_id().signing_public_key().0).0) !=
new_name);
self.full_id.public_id_mut().set_name(new_name);
let our_info = NodeInfo::new(*self.full_id.public_id(), self.crust_service.id());
self.routing_table = RoutingTable::new(our_info, GROUP_SIZE, EXTRA_BUCKET_ENTRIES);
}
fn dropped_client_connection(&mut self, peer_id: &PeerId) {
if let Some(info) = self.peer_mgr.remove_client(peer_id) {
if info.client_restriction {
debug!("{:?} Client disconnected: {:?}", self, peer_id);
} else {
debug!("{:?} Joining node {:?} dropped. {} remaining.",
self,
peer_id,
self.peer_mgr.joining_nodes_num());
}
}
}
fn dropped_bootstrap_connection(&mut self, peer_id: &PeerId) {
if self.peer_mgr.get_proxy_public_id(peer_id).is_some() {
if let Some((_, public_id)) = self.peer_mgr.remove_proxy() {
debug!("{:?} Lost bootstrap connection to {:?} ({:?}).",
self,
public_id.name(),
peer_id);
if self.role == Role::Client {
let _ = self.event_sender.send(Event::Terminate);
}
}
}
}
fn dropped_tunnel_client(&mut self, peer_id: &PeerId) {
for other_id in self.tunnels.drop_client(peer_id) {
let message = DirectMessage::TunnelClosed(*peer_id);
let _ = self.send_direct_message(&other_id, message);
}
}
fn dropped_tunnel_node(&mut self, peer_id: &PeerId) {
let peers = self.tunnels
.remove_tunnel(peer_id)
.into_iter()
.filter_map(|dst_id| {
self.routing_table
.iter()
.find(|node| node.peer_id == dst_id)
.map(|&node| (dst_id, node))
})
.collect_vec();
for (dst_id, node) in peers {
self.dropped_routing_node_connection(&dst_id);
debug!("{:?} Lost tunnel for peer {:?} ({:?}). Requesting new tunnel.",
self,
dst_id,
node.name());
self.find_tunnel_for_peer(dst_id, *node.name());
}
}
fn dropped_routing_node_connection(&mut self, peer_id: &PeerId) {
if let Some(&node) = self.routing_table.iter().find(|node| node.peer_id == *peer_id) {
if let Some(DroppedNodeDetails { incomplete_bucket }) = self.routing_table
.remove(node.name()) {
info!("{:?} Dropped {:?} from the routing table.",
self,
node.name());
let common_groups = self.routing_table
.is_in_any_close_group_with(self.name().bucket_index(node.name()), GROUP_SIZE);
if common_groups {
let event = Event::NodeLost(*node.name(), self.routing_table.to_names());
if let Err(err) = self.event_sender.send(event) {
error!("{:?} Error sending event to routing user - {:?}", self, err);
}
}
if let Some(bucket_index) = incomplete_bucket {
if let Err(e) = self.request_bucket_ids(bucket_index) {
debug!("{:?} Failed to request replacement connection_info from bucket \
{}: {:?}.",
self,
bucket_index,
e);
}
}
if self.routing_table.len() < GROUP_SIZE - 1 {
debug!("{:?} Lost connection, less than {} remaining.",
self,
GROUP_SIZE - 1);
let _ = self.event_sender.send(if self.role == Role::FirstNode {
Event::Terminate
} else {
Event::RestartRequired
});
}
self.reset_bucket_refresh_timer();
}
};
}
fn check_address_for_routing_table(&self, name: &XorName) -> Result<(), RoutingError> {
if !self.routing_table.contains(name) && self.routing_table.allow_connection(name) {
Ok(())
} else {
Err(RoutingError::RefusedFromRoutingTable)
}
}
fn rebootstrap(&mut self) {
match self.state {
State::Bootstrapping(..) => {
if let Some((peer_id, _)) = self.peer_mgr.remove_proxy() {
self.crust_service.disconnect(peer_id);
}
}
_ => {
warn!("Should only be called while in Bootstrapping state");
}
}
self.state = State::Disconnected;
let _ = self.crust_service.start_bootstrap(self.bootstrap_blacklist.clone());
}
}
impl Debug for Core {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "{:?}({})", self.state, self.name())
}
}