use crust::CrustUser;
use error::RoutingError;
#[cfg(feature = "use-mock-crust")]
use fake_clock::FakeClock as Instant;
use id::PublicId;
use itertools::Itertools;
use log::Level;
use messages::MessageContent;
use rand;
use resource_proof::ResourceProof;
use resource_prover::RESOURCE_PROOF_DURATION_SECS;
use routing_table::Error as RoutingTableError;
use routing_table::{
Authority, OwnMergeState, Prefix, RemovalDetails, RoutingTable, VersionedPrefix,
};
use signature_accumulator::ACCUMULATION_TIMEOUT_SECS;
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::net::IpAddr;
use std::time::Duration;
#[cfg(not(feature = "use-mock-crust"))]
use std::time::Instant;
use std::{error, fmt, iter, mem};
use types::MessageId;
use xor_name::XorName;
use {PrivConnectionInfo, PubConnectionInfo};
const JOINING_NODE_TIMEOUT_SECS: u64 = 900;
const CONNECTING_PEER_TIMEOUT_SECS: u64 = 90;
const CONNECTED_PEER_TIMEOUT_SECS: u64 = 60;
const CANDIDATE_ACCEPT_TIMEOUT_SECS: u64 = 60;
#[cfg(feature = "use-mock-crust")]
#[doc(hidden)]
pub mod test_consts {
pub const ACCUMULATION_TIMEOUT_SECS: u64 = super::ACCUMULATION_TIMEOUT_SECS;
pub const ACK_TIMEOUT_SECS: u64 = ::ack_manager::ACK_TIMEOUT_SECS;
pub const CANDIDATE_ACCEPT_TIMEOUT_SECS: u64 = super::CANDIDATE_ACCEPT_TIMEOUT_SECS;
pub const RESOURCE_PROOF_DURATION_SECS: u64 = super::RESOURCE_PROOF_DURATION_SECS;
pub const CONNECTING_PEER_TIMEOUT_SECS: u64 = super::CONNECTING_PEER_TIMEOUT_SECS;
pub const CONNECTED_PEER_TIMEOUT_SECS: u64 = super::CONNECTED_PEER_TIMEOUT_SECS;
pub const JOINING_NODE_TIMEOUT_SECS: u64 = super::JOINING_NODE_TIMEOUT_SECS;
pub const RATE_EXCEED_RETRY_MS: u64 = ::states::RATE_EXCEED_RETRY_MS;
}
pub type SectionMap = BTreeMap<VersionedPrefix<XorName>, BTreeSet<PublicId>>;
#[derive(Default)]
pub struct PeerDetails {
pub routing_peer_details: Vec<(PublicId, bool)>,
pub out_of_sync_peers: Vec<PublicId>,
pub removal_details: Vec<RemovalDetails<XorName>>,
}
#[derive(Debug)]
pub enum Error {
PeerNotFound,
UnexpectedState,
}
impl fmt::Display for Error {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::PeerNotFound => write!(formatter, "Peer not found"),
Error::UnexpectedState => write!(formatter, "Peer state does not allow operation"),
}
}
}
impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::PeerNotFound => "Peer not found",
Error::UnexpectedState => "Peer state does not allow operation",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum RoutingConnection {
JoiningNode(Instant),
Proxy(Instant),
Direct,
Tunnel,
}
impl RoutingConnection {
fn is_tunnel(&self) -> bool {
match *self {
RoutingConnection::Tunnel => true,
RoutingConnection::Direct
| RoutingConnection::JoiningNode(_)
| RoutingConnection::Proxy(_) => false,
}
}
}
#[derive(Debug)]
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum PeerState {
Bootstrapper {
peer_kind: CrustUser,
ip: IpAddr,
},
ConnectionInfoPreparing {
us_as_src: Authority<XorName>,
them_as_dst: Authority<XorName>,
their_info: Option<(PubConnectionInfo, MessageId)>,
},
ConnectionInfoReady(PrivConnectionInfo),
CrustConnecting,
SearchingForTunnel,
Connected(bool),
Client {
ip: IpAddr,
traffic: u64,
},
JoiningNode,
Routing(RoutingConnection),
Candidate(RoutingConnection),
Proxy,
}
impl PeerState {
pub fn can_tunnel_for(&self) -> bool {
match *self {
PeerState::Routing(RoutingConnection::Direct)
| PeerState::Candidate(RoutingConnection::Direct) => true,
_ => false,
}
}
}
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Debug)]
pub enum ConnectionInfoReceivedResult {
Ready(PrivConnectionInfo, PubConnectionInfo),
Prepare(u32),
Waiting,
IsProxy,
IsClient,
IsJoiningNode,
IsConnected,
}
#[derive(Debug)]
pub struct ConnectionInfoPreparedResult {
pub pub_id: PublicId,
pub src: Authority<XorName>,
pub dst: Authority<XorName>,
pub infos: Option<(PrivConnectionInfo, PubConnectionInfo, MessageId)>,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ReconnectingPeer {
True,
False,
}
#[derive(Debug)]
pub struct Peer {
pub_id: PublicId,
state: PeerState,
timestamp: Instant,
valid: bool,
reconnecting: ReconnectingPeer,
}
impl Peer {
pub fn new(
pub_id: PublicId,
state: PeerState,
valid: bool,
reconnecting: ReconnectingPeer,
) -> Self {
Self {
pub_id,
state,
timestamp: Instant::now(),
valid,
reconnecting,
}
}
pub fn pub_id(&self) -> &PublicId {
&self.pub_id
}
pub fn name(&self) -> &XorName {
self.pub_id.name()
}
pub fn state(&self) -> &PeerState {
&self.state
}
pub fn valid(&self) -> bool {
self.valid
}
pub fn is_reconnecting(&self) -> bool {
self.reconnecting == ReconnectingPeer::True
}
fn is_connected(&self) -> Option<bool> {
match self.state {
PeerState::ConnectionInfoPreparing { .. }
| PeerState::ConnectionInfoReady(_)
| PeerState::CrustConnecting
| PeerState::SearchingForTunnel => None,
PeerState::Bootstrapper { .. }
| PeerState::JoiningNode
| PeerState::Proxy
| PeerState::Client { .. } => Some(false),
PeerState::Connected(is_tunnel) => Some(is_tunnel),
PeerState::Candidate(conn) | PeerState::Routing(conn) => Some(conn.is_tunnel()),
}
}
fn is_expired(&self) -> bool {
let timeout = match self.state {
PeerState::ConnectionInfoPreparing { .. }
| PeerState::ConnectionInfoReady(_)
| PeerState::CrustConnecting
| PeerState::SearchingForTunnel => CONNECTING_PEER_TIMEOUT_SECS,
PeerState::JoiningNode | PeerState::Proxy => JOINING_NODE_TIMEOUT_SECS,
PeerState::Bootstrapper { .. } | PeerState::Connected(_) => CONNECTED_PEER_TIMEOUT_SECS,
PeerState::Candidate(_) | PeerState::Client { .. } | PeerState::Routing(_) => {
return false
}
};
self.timestamp.elapsed() >= Duration::from_secs(timeout)
}
fn to_routing_connection(&self) -> Result<RoutingConnection, RoutingError> {
match self.state {
PeerState::Bootstrapper { .. }
| PeerState::ConnectionInfoPreparing { .. }
| PeerState::ConnectionInfoReady(_)
| PeerState::CrustConnecting
| PeerState::SearchingForTunnel
| PeerState::Client { .. } => Err(RoutingError::InvalidPeer),
PeerState::Candidate(conn) | PeerState::Routing(conn) => Ok(conn),
PeerState::Proxy => Ok(RoutingConnection::Proxy(self.timestamp)),
PeerState::JoiningNode => Ok(RoutingConnection::JoiningNode(self.timestamp)),
PeerState::Connected(true) => Ok(RoutingConnection::Tunnel),
PeerState::Connected(false) => Ok(RoutingConnection::Direct),
}
}
fn is_routing(&self) -> bool {
match self.state {
PeerState::Routing(_) => true,
_ => false,
}
}
fn is_proxy(&self) -> bool {
match self.state {
PeerState::Proxy
| PeerState::Candidate(RoutingConnection::Proxy(_))
| PeerState::Routing(RoutingConnection::Proxy(_)) => true,
_ => false,
}
}
fn is_client(&self) -> bool {
if let PeerState::Client { .. } = self.state {
true
} else {
false
}
}
fn is_joining_node(&self) -> bool {
match self.state {
PeerState::JoiningNode
| PeerState::Candidate(RoutingConnection::JoiningNode(_))
| PeerState::Routing(RoutingConnection::JoiningNode(_)) => true,
_ => false,
}
}
}
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Debug, Eq, PartialEq)]
enum Candidate {
None,
Expecting {
timestamp: Instant,
old_pub_id: PublicId,
},
AcceptedForResourceProof {
res_proof_start: Instant,
target_interval: (XorName, XorName),
old_pub_id: PublicId,
},
ResourceProof {
res_proof_start: Instant,
new_pub_id: PublicId,
new_client_auth: Authority<XorName>,
challenge: Option<ResourceProofChallenge>,
passed_our_challenge: bool,
},
}
impl Candidate {
fn is_expired(&self) -> bool {
match *self {
Candidate::None => false,
Candidate::Expecting { ref timestamp, .. } => {
timestamp.elapsed() > Duration::from_secs(CANDIDATE_ACCEPT_TIMEOUT_SECS)
}
Candidate::AcceptedForResourceProof {
res_proof_start, ..
}
| Candidate::ResourceProof {
res_proof_start, ..
} => {
res_proof_start.elapsed()
> Duration::from_secs(RESOURCE_PROOF_DURATION_SECS + ACCUMULATION_TIMEOUT_SECS)
}
}
}
}
#[derive(Debug, Eq, PartialEq)]
struct ResourceProofChallenge {
target_size: usize,
difficulty: u8,
seed: Vec<u8>,
proof: VecDeque<u8>,
}
pub struct PeerManager {
connection_token_map: HashMap<u32, PublicId>,
peers: HashMap<PublicId, Peer>,
routing_table: RoutingTable<XorName>,
our_public_id: PublicId,
candidate: Candidate,
disable_client_rate_limiter: bool,
}
impl PeerManager {
pub fn new(
min_section_size: usize,
our_public_id: PublicId,
disable_client_rate_limiter: bool,
) -> PeerManager {
PeerManager {
connection_token_map: HashMap::new(),
peers: HashMap::new(),
routing_table: RoutingTable::new(*our_public_id.name(), min_section_size),
our_public_id,
candidate: Candidate::None,
disable_client_rate_limiter,
}
}
pub fn add_prefixes(
&mut self,
prefixes: Vec<VersionedPrefix<XorName>>,
) -> Result<(), RoutingError> {
self.routing_table.add_prefixes(prefixes)?;
Ok(())
}
pub fn routing_table(&self) -> &RoutingTable<XorName> {
&self.routing_table
}
pub fn handle_bootstrap_request(&mut self, pub_id: &PublicId) {
if let Some(peer) = self.peers.get_mut(pub_id) {
if let PeerState::Bootstrapper { peer_kind, ip } = peer.state {
match peer_kind {
CrustUser::Node => peer.state = PeerState::JoiningNode,
CrustUser::Client => peer.state = PeerState::Client { ip, traffic: 0 },
}
return;
}
}
log_or_panic!(
Level::Error,
"{:?} does not have {:?} as a bootstrapper.",
self,
pub_id
);
}
pub fn expect_candidate(&mut self, old_pub_id: PublicId) -> Result<(), RoutingError> {
if self.candidate != Candidate::None {
return Err(RoutingError::AlreadyHandlingJoinRequest);
}
self.candidate = Candidate::Expecting {
timestamp: Instant::now(),
old_pub_id,
};
Ok(())
}
pub fn accept_as_candidate(
&mut self,
old_pub_id: PublicId,
target_interval: (XorName, XorName),
) -> (Prefix<XorName>, BTreeSet<PublicId>) {
self.candidate = Candidate::AcceptedForResourceProof {
res_proof_start: Instant::now(),
old_pub_id,
target_interval,
};
let our_section = self.routing_table.our_section().iter().cloned().collect();
(
*self.routing_table.our_prefix(),
self.get_pub_ids(&our_section),
)
}
pub fn verify_candidate(
&mut self,
new_pub_id: &PublicId,
part_index: usize,
part_count: usize,
proof_part: Vec<u8>,
leading_zero_bytes: u64,
) -> Result<Option<(usize, u8, Duration)>, RoutingError> {
let (challenge, passed_our_challenge, res_proof_start) = match self.candidate {
Candidate::ResourceProof {
new_pub_id: ref pub_id,
challenge: Some(ref mut challenge),
ref mut passed_our_challenge,
ref res_proof_start,
..
}
if new_pub_id == pub_id =>
{
(challenge, passed_our_challenge, res_proof_start)
}
_ => return Err(RoutingError::UnknownCandidate),
};
challenge.proof.extend(proof_part);
if part_index + 1 != part_count {
return Ok(None);
}
let rp_object = ResourceProof::new(challenge.target_size, challenge.difficulty);
if rp_object.validate_all(&challenge.seed, &challenge.proof, leading_zero_bytes) {
*passed_our_challenge = true;
Ok(Some((
challenge.target_size,
challenge.difficulty,
res_proof_start.elapsed(),
)))
} else {
Err(RoutingError::FailedResourceProofValidation)
}
}
pub fn verified_candidate_info(&self) -> Result<(MessageContent, XorName), RoutingError> {
let (new_pub_id, new_client_auth) = match self.candidate {
Candidate::ResourceProof {
ref new_pub_id,
ref new_client_auth,
passed_our_challenge: true,
..
} => (new_pub_id, new_client_auth),
Candidate::ResourceProof {
ref new_pub_id,
passed_our_challenge: false,
..
} => {
info!(
"{:?} Candidate {} has not passed our resource proof challenge in time. Not \
sending approval vote to our section with {:?}",
self,
new_pub_id.name(),
self.routing_table.our_prefix()
);
return Err(RoutingError::UnknownCandidate);
}
_ => return Err(RoutingError::UnknownCandidate),
};
if self
.peers
.get(new_pub_id)
.and_then(Peer::is_connected)
.is_none()
{
log_or_panic!(
Level::Error,
"{:?} Not connected to {}.",
self,
new_pub_id.name()
);
return Err(RoutingError::UnknownCandidate);
}
Ok((
MessageContent::CandidateApproval {
new_public_id: *new_pub_id,
new_client_auth: *new_client_auth,
sections: self.ideal_rt(),
},
*new_pub_id.name(),
))
}
pub fn handle_candidate_approval(
&mut self,
new_pub_id: &PublicId,
) -> Result<Option<bool>, RoutingError> {
match mem::replace(&mut self.candidate, Candidate::None) {
Candidate::ResourceProof {
new_pub_id: pub_id, ..
}
if pub_id == *new_pub_id =>
{
()
}
_ => return Err(RoutingError::UnknownCandidate),
}
let debug_id = format!("{:?}", self);
if let Some(peer) = self.peers.get_mut(new_pub_id) {
peer.valid = true;
let is_connected = peer.is_connected();
if is_connected.is_none() {
trace!(
"{} Candidate {} not yet connected to us.",
debug_id,
new_pub_id.name()
);
}
Ok(is_connected)
} else {
trace!("{} No peer with name {}", debug_id, new_pub_id.name());
Err(RoutingError::InvalidStateForOperation)
}
}
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn handle_candidate_info(
&mut self,
old_pub_id: &PublicId,
new_pub_id: &PublicId,
new_client_auth: &Authority<XorName>,
target_size: usize,
difficulty: u8,
seed: Vec<u8>,
) -> Result<bool, RoutingError> {
let debug_prefix = format!(
"{:?} Candidate {}->{}",
self,
old_pub_id.name(),
new_pub_id.name()
);
let candidate = mem::replace(&mut self.candidate, Candidate::None);
let (res_proof_start, target_interval) = match candidate {
Candidate::AcceptedForResourceProof {
old_pub_id: old_id,
res_proof_start,
target_interval,
}
if old_id == *old_pub_id =>
{
(res_proof_start, target_interval)
}
candidate => {
self.candidate = candidate;
return if self.peers.get(new_pub_id).map_or(false, Peer::valid) {
Ok(false)
} else {
Err(RoutingError::UnknownCandidate)
};
}
};
if *new_pub_id.name() < target_interval.0 || *new_pub_id.name() > target_interval.1 {
warn!(
"{} has used a new ID which is not within the required target range.",
debug_prefix
);
return Err(RoutingError::InvalidRelocationTargetRange);
}
let peer = match self.peers.get_mut(new_pub_id) {
Some(peer) => peer,
None => {
log_or_panic!(Level::Error, "{} is not connected to us.", debug_prefix);
return Err(RoutingError::UnknownConnection(*new_pub_id));
}
};
let conn = peer.to_routing_connection()?;
peer.state = PeerState::Candidate(conn);
let (res, challenge) = if conn == RoutingConnection::Tunnel {
(Err(RoutingError::CandidateIsTunnelling), None)
} else {
(
Ok(true),
Some(ResourceProofChallenge {
target_size,
difficulty,
seed,
proof: VecDeque::new(),
}),
)
};
self.candidate = Candidate::ResourceProof {
res_proof_start,
new_pub_id: *new_pub_id,
new_client_auth: *new_client_auth,
challenge,
passed_our_challenge: false,
};
res
}
pub fn show_candidate_status(&self) {
let mut log_msg = format!("{:?} Candidate Status - ", self);
match self.candidate {
Candidate::None => trace!("{}No candidate is currently being handled.", log_msg),
Candidate::Expecting { .. } => (),
Candidate::AcceptedForResourceProof { ref old_pub_id, .. } => trace!(
"{}{} has not sent CandidateInfo yet.",
log_msg,
old_pub_id.name()
),
Candidate::ResourceProof {
ref new_pub_id,
challenge: None,
..
} => trace!("{}{} is tunneling to us.", log_msg, new_pub_id.name()),
Candidate::ResourceProof {
ref new_pub_id,
challenge: Some(ref challenge),
passed_our_challenge,
..
} => {
log_msg = format!("{}{}", log_msg, new_pub_id.name());
if passed_our_challenge {
log_msg = format!("{}has passed our challenge ", log_msg);
} else if challenge.proof.is_empty() {
log_msg = format!("{}hasn't responded to our challenge yet ", log_msg);
} else {
let percent_done = challenge.proof.len() * 100 / challenge.target_size;
log_msg = format!("{}has sent {}% of resource proof ", log_msg, percent_done);
}
trace!("{}and is not yet approved by our section.", log_msg);
}
}
}
pub fn add_to_routing_table(&mut self, pub_id: &PublicId) -> Result<(), RoutingError> {
let self_debug = format!("{:?}", self);
let peer = if let Some(peer) = self.peers.get_mut(pub_id) {
peer
} else {
log_or_panic!(Level::Error, "{} Peer {} not found.", self_debug, pub_id);
return Err(RoutingError::UnknownConnection(*pub_id));
};
if !peer.valid() {
log_or_panic!(
Level::Error,
"{} Not adding invalid Peer {} to RT.",
self_debug,
pub_id
);
return Err(RoutingError::InvalidPeer);
}
let conn = match peer.to_routing_connection() {
Ok(conn) => conn,
Err(e) => {
log_or_panic!(
Level::Error,
"{} Not adding Peer {} to RT - not connected.",
self_debug,
pub_id
);
return Err(e);
}
};
let res = match self.routing_table.add(*pub_id.name()) {
res @ Ok(_) | res @ Err(RoutingTableError::AlreadyExists) => res,
Err(e) => return Err(e.into()),
};
peer.state = PeerState::Routing(conn);
trace!("{} Set {} to {:?}", self_debug, pub_id, peer.state);
res?;
Ok(())
}
pub fn split_section(
&mut self,
ver_pfx: VersionedPrefix<XorName>,
) -> (Vec<PublicId>, Option<Prefix<XorName>>) {
let (names_to_drop, our_new_prefix) = self.routing_table.split(ver_pfx);
for name in &names_to_drop {
info!("{:?} Dropped {} from the routing table.", self, name);
}
let mut ids_to_drop = names_to_drop
.iter()
.filter_map(|name| self.get_peer_by_name(name))
.map(Peer::pub_id)
.cloned()
.collect_vec();
let remove_candidate = match self.candidate {
Candidate::None
| Candidate::Expecting { .. }
| Candidate::AcceptedForResourceProof { .. } => None,
Candidate::ResourceProof { ref new_pub_id, .. } => {
if !self.routing_table.our_prefix().matches(new_pub_id.name()) {
Some(*new_pub_id)
} else {
None
}
}
};
ids_to_drop = ids_to_drop
.into_iter()
.chain(remove_candidate.iter().cloned())
.collect_vec();
let ids_to_drop = self.remove_split_peers(ids_to_drop);
(ids_to_drop, our_new_prefix)
}
pub fn add_prefix(&mut self, ver_pfx: VersionedPrefix<XorName>) -> Vec<PublicId> {
let names_to_drop = self.routing_table.add_prefix(ver_pfx);
for name in &names_to_drop {
info!("{:?} Dropped {} from the routing table.", self, name);
}
let ids_to_drop = names_to_drop
.iter()
.filter_map(|name| self.get_peer_by_name(name))
.map(Peer::pub_id)
.cloned()
.collect_vec();
self.remove_split_peers(ids_to_drop)
}
pub fn should_merge(&mut self) -> bool {
let expecting_peer = self
.peers
.values()
.any(|peer| peer.valid() && !peer.is_routing() && !peer.is_reconnecting());
!expecting_peer && self.routing_table.should_merge()
}
pub fn merge_details(&self) -> (Prefix<XorName>, SectionMap) {
let sections = self
.routing_table
.all_sections_iter()
.map(|(prefix, (v, members))| (prefix.with_version(v), self.get_pub_ids(members)))
.collect();
(*self.routing_table.our_prefix(), sections)
}
pub fn merge_own_section(
&mut self,
sender_prefix: Prefix<XorName>,
merge_version: u64,
sections: SectionMap,
) -> (OwnMergeState<XorName>, Vec<PublicId>) {
let needed = sections
.iter()
.flat_map(|(_, pub_ids)| pub_ids)
.filter(|pub_id| !self.routing_table.has(pub_id.name()))
.cloned()
.collect();
let ver_pfxs = sections.keys().cloned();
let merge_state = self
.routing_table
.merge_own_section(sender_prefix.popped().with_version(merge_version), ver_pfxs);
(merge_state, needed)
}
pub fn merge_other_section(
&mut self,
ver_pfx: VersionedPrefix<XorName>,
section: BTreeSet<PublicId>,
) -> BTreeSet<PublicId> {
let needed_names = self
.routing_table
.merge_other_section(ver_pfx, section.iter().map(PublicId::name).cloned());
section
.iter()
.filter(|id| needed_names.contains(id.name()))
.cloned()
.collect()
}
pub fn can_tunnel_for(&self, pub_id: &PublicId, dst_id: &PublicId) -> bool {
let peer_state = self.get_peer(pub_id).map(Peer::state);
let dst_state = self.get_peer(dst_id).map(Peer::state);
let result = match (peer_state, dst_state) {
(Some(peer1), Some(peer2)) => peer1.can_tunnel_for() && peer2.can_tunnel_for(),
_ => false,
};
if !result {
trace!(
"{:?} Can't tunnel from {} with state {:?} to {} with state {:?}.",
self,
pub_id,
peer_state,
dst_id,
dst_state
);
}
result
}
pub fn is_routing_peer(&self, pub_id: &PublicId) -> bool {
self.peers.get(pub_id).map_or(false, Peer::is_routing)
}
pub fn is_proxy(&self, pub_id: &PublicId) -> bool {
self.peers.get(pub_id).map_or(false, Peer::is_proxy)
}
pub fn is_client(&self, pub_id: &PublicId) -> bool {
self.peers.get(pub_id).map_or(false, Peer::is_client)
}
pub fn is_joining_node(&self, pub_id: &PublicId) -> bool {
self.peers.get(pub_id).map_or(false, Peer::is_joining_node)
}
pub fn get_proxy_name(&self) -> Option<&XorName> {
self.peers
.values()
.find(|peer| match peer.state {
PeerState::Proxy | PeerState::Routing(RoutingConnection::Proxy(_)) => true,
_ => false,
}).map(Peer::name)
}
pub fn remove_expired_peers(&mut self) -> Vec<PublicId> {
let remove_candidate = if self.candidate.is_expired() {
match self.candidate {
Candidate::None => None,
Candidate::Expecting { ref old_pub_id, .. }
| Candidate::AcceptedForResourceProof { ref old_pub_id, .. } => Some(*old_pub_id),
Candidate::ResourceProof { ref new_pub_id, .. } => Some(*new_pub_id),
}
} else {
None
};
let mut normalisable_conns = Vec::new();
let expired_peers = self
.peers
.values()
.filter(|peer| match peer.state {
PeerState::Routing(RoutingConnection::JoiningNode(timestamp))
| PeerState::Routing(RoutingConnection::Proxy(timestamp)) => {
if timestamp.elapsed() >= Duration::from_secs(JOINING_NODE_TIMEOUT_SECS) {
normalisable_conns.push(*peer.pub_id());
}
false
}
_ => peer.is_expired(),
}).map(Peer::pub_id)
.cloned()
.chain(remove_candidate)
.collect_vec();
for id in &normalisable_conns {
if let Some(peer) = self.peers.get_mut(id) {
peer.state = PeerState::Routing(RoutingConnection::Direct)
}
}
for id in &expired_peers {
let _ = self.remove_peer(id);
}
expired_peers
}
pub fn client_num(&self) -> usize {
self.peers.values().filter(|peer| peer.is_client()).count()
}
pub fn add_client_traffic(&mut self, pub_id: &PublicId, added_bytes: u64) {
let self_pfx = format!("{:?}", self);
let _ = self.peers.get_mut(pub_id).map(|peer| {
if let PeerState::Client {
ip,
traffic: old_traffic,
} = *peer.state()
{
let new_traffic = old_traffic.wrapping_add(added_bytes);
if new_traffic % (100 * 1024 * 1024) < added_bytes {
info!(
"{} Stats - Client current session traffic from {:?} - {:?}",
self_pfx, ip, new_traffic
);
}
peer.state = PeerState::Client {
ip,
traffic: new_traffic,
};
}
});
}
pub fn can_accept_client(&self, client_ip: IpAddr) -> bool {
self.disable_client_rate_limiter || !self.peers.values().any(|peer| match *peer.state() {
PeerState::Client { ip, .. } => client_ip == ip,
_ => false,
})
}
pub fn connected_to(&mut self, pub_id: &PublicId) {
if let Some(peer) = self.peers.get_mut(pub_id) {
match peer.state {
PeerState::Routing(RoutingConnection::Tunnel) => {
peer.state = PeerState::Routing(RoutingConnection::Direct)
}
_ => {
peer.timestamp = Instant::now();
peer.state = PeerState::Connected(false);
}
}
peer.reconnecting = ReconnectingPeer::False;
return;
}
self.insert_peer(Peer::new(
*pub_id,
PeerState::Connected(false),
false,
ReconnectingPeer::False,
));
}
pub fn tunnelling_to(&mut self, pub_id: &PublicId) -> bool {
match self.get_peer(pub_id).map(Peer::state) {
Some(&PeerState::Bootstrapper { .. })
| Some(&PeerState::Connected(_))
| Some(&PeerState::Candidate(_))
| Some(&PeerState::Routing(_)) => return false,
_ => (),
}
let found = if let Some(peer) = self.peers.get_mut(pub_id) {
peer.timestamp = Instant::now();
peer.state = PeerState::Connected(true);
peer.reconnecting = ReconnectingPeer::False;
true
} else {
false
};
if !found {
self.insert_peer(Peer::new(
*pub_id,
PeerState::Connected(true),
false,
ReconnectingPeer::False,
));
}
true
}
pub fn get_peer(&self, pub_id: &PublicId) -> Option<&Peer> {
self.peers.get(pub_id)
}
pub fn get_peer_by_name(&self, name: &XorName) -> Option<&Peer> {
let id = if let Some(id) = self.peers.keys().find(|id| id.name() == name) {
id
} else {
return None;
};
self.get_peer(id)
}
pub fn set_peer_valid(&mut self, id: &PublicId, valid: bool) {
if let Some(peer) = self.peers.get_mut(id) {
peer.valid = valid;
}
}
pub fn get_pub_id(&self, name: &XorName) -> Option<&PublicId> {
self.get_peer_by_name(name).map(Peer::pub_id)
}
pub fn get_pub_ids(&self, names: &BTreeSet<XorName>) -> BTreeSet<PublicId> {
names
.iter()
.filter_map(|name| {
if name == self.our_public_id.name() {
Some(&self.our_public_id)
} else {
self.get_pub_id(name)
}
}).cloned()
.collect()
}
pub fn get_routing_peer_details(&mut self) -> PeerDetails {
let mut result = PeerDetails::default();
let mut dropped_routing_nodes = Vec::new();
for name in self.routing_table().iter() {
match self.get_peer_by_name(name) {
None => {
log_or_panic!(
Level::Error,
"{:?} Have {} in RT, but have no entry in peer_map for it.",
self,
name
);
dropped_routing_nodes.push(*name);
}
Some(peer) => {
if !peer.is_routing() {
log_or_panic!(
Level::Error,
"{:?} Have {} in RT, but have state {:?} for it.",
self,
name,
peer.state
);
result.out_of_sync_peers.push(peer.pub_id);
}
}
};
}
for name in dropped_routing_nodes {
if let Ok(removal_detail) = self.routing_table.remove(&name) {
result.removal_details.push(removal_detail);
}
}
let mut nodes_missing_from_rt = Vec::new();
for peer in self.peers.values() {
let is_tunnel = match peer.state {
PeerState::Routing(conn) => {
if !self.routing_table.has(peer.name()) {
nodes_missing_from_rt.push(peer.pub_id);
continue;
} else {
conn == RoutingConnection::Tunnel
}
}
PeerState::Candidate(conn) => conn == RoutingConnection::Tunnel,
PeerState::Connected(is_tunnel) => is_tunnel,
_ => continue,
};
result.routing_peer_details.push((peer.pub_id, is_tunnel));
}
for id in nodes_missing_from_rt {
if let Some(peer) = self.peers.remove(&id) {
log_or_panic!(
Level::Error,
"{:?} Peer {:?} with state {:?} is missing from RT.",
self,
peer.name(),
peer.state
);
result.out_of_sync_peers.push(peer.pub_id);
}
}
result
}
pub fn correct_state_to_direct(&mut self, pub_id: &PublicId) {
let state = match self.peers.get_mut(pub_id).map(|peer| &peer.state) {
Some(&PeerState::Routing(_)) => PeerState::Routing(RoutingConnection::Direct),
Some(&PeerState::Candidate(_)) => PeerState::Candidate(RoutingConnection::Direct),
Some(&PeerState::Connected(_)) => PeerState::Connected(false),
state => {
log_or_panic!(
Level::Error,
"{} Cannot set state {:?} to direct.",
pub_id,
state
);
return;
}
};
if let Some(peer) = self.peers.get_mut(pub_id) {
peer.state = state;
}
}
pub fn correct_state_to_tunnel(&mut self, pub_id: &PublicId) {
let state = match self.peers.get(pub_id).map(|peer| &peer.state) {
Some(&PeerState::Routing(_)) => PeerState::Routing(RoutingConnection::Tunnel),
Some(&PeerState::Candidate(_)) => PeerState::Candidate(RoutingConnection::Tunnel),
Some(&PeerState::Connected(_)) => PeerState::Connected(true),
state => {
log_or_panic!(
Level::Error,
"{:?} Cannot set state {:?} to tunnel.",
self,
state
);
return;
}
};
if let Some(peer) = self.peers.get_mut(pub_id) {
peer.state = state;
}
}
pub fn is_potential_tunnel_node(&self, tunnel_name: &XorName, client_name: &XorName) -> bool {
if self.our_public_id.name() == tunnel_name
|| self.our_public_id.name() == client_name
|| !self
.get_peer_by_name(tunnel_name)
.map(Peer::state)
.map_or(false, PeerState::can_tunnel_for)
{
return false;
}
let our_prefix = self.routing_table.our_prefix();
if our_prefix.matches(client_name) {
our_prefix.popped().matches(tunnel_name)
} else {
self.routing_table
.find_section_prefix(tunnel_name)
.map_or(false, |pfx| pfx.matches(client_name) || pfx == *our_prefix)
}
}
pub fn set_searching_for_tunnel(&mut self, pub_id: PublicId, valid: bool) -> Vec<PublicId> {
let reconnecting = match self.get_peer(&pub_id) {
Some(peer) => match *peer.state() {
PeerState::Bootstrapper { .. }
| PeerState::Client { .. }
| PeerState::JoiningNode
| PeerState::Proxy
| PeerState::Routing(_)
| PeerState::Connected(_) => return vec![],
_ => peer.reconnecting,
},
None => ReconnectingPeer::False,
};
self.insert_peer(Peer::new(
pub_id,
PeerState::SearchingForTunnel,
valid,
reconnecting,
));
self.routing_table
.iter()
.filter(|tunnel_name| self.is_potential_tunnel_node(tunnel_name, pub_id.name()))
.filter_map(|name| self.get_peer_by_name(name))
.map(Peer::pub_id)
.cloned()
.collect()
}
pub fn connection_info_prepared(
&mut self,
token: u32,
our_info: PrivConnectionInfo,
) -> Result<ConnectionInfoPreparedResult, Error> {
let pub_id = self
.connection_token_map
.remove(&token)
.ok_or(Error::PeerNotFound)?;
let (us_as_src, them_as_dst, opt_their_info, valid, reconnecting) =
match self.peers.remove(&pub_id) {
Some(Peer {
state:
PeerState::ConnectionInfoPreparing {
us_as_src,
them_as_dst,
their_info,
},
valid,
reconnecting,
..
}) => (us_as_src, them_as_dst, their_info, valid, reconnecting),
Some(peer) => {
self.insert_peer(peer);
return Err(Error::UnexpectedState);
}
None => return Err(Error::PeerNotFound),
};
let infos = match opt_their_info {
Some((their_info, msg_id)) => {
let state = PeerState::CrustConnecting;
self.insert_peer(Peer::new(pub_id, state, valid, reconnecting));
Some((our_info, their_info, msg_id))
}
None => {
let state = PeerState::ConnectionInfoReady(our_info);
self.insert_peer(Peer::new(pub_id, state, valid, reconnecting));
None
}
};
Ok(ConnectionInfoPreparedResult {
pub_id,
src: us_as_src,
dst: them_as_dst,
infos,
})
}
pub fn connection_info_received(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
peer_info: PubConnectionInfo,
msg_id: MessageId,
is_conn_info_req: bool,
) -> Result<ConnectionInfoReceivedResult, Error> {
let pub_id = peer_info.id();
match self.peers.remove(&pub_id) {
Some(Peer {
state: PeerState::ConnectionInfoReady(our_info),
valid,
reconnecting,
..
}) => {
let state = PeerState::CrustConnecting;
self.insert_peer(Peer::new(pub_id, state, valid, reconnecting));
Ok(ConnectionInfoReceivedResult::Ready(our_info, peer_info))
}
Some(Peer {
state:
PeerState::ConnectionInfoPreparing {
us_as_src,
them_as_dst,
their_info: None,
},
valid,
reconnecting,
..
}) => {
let state = PeerState::ConnectionInfoPreparing {
us_as_src,
them_as_dst,
their_info: Some((peer_info, msg_id)),
};
self.insert_peer(Peer::new(pub_id, state, valid, reconnecting));
Ok(ConnectionInfoReceivedResult::Waiting)
}
Some(
peer @ Peer {
state: PeerState::Bootstrapper { .. },
..
},
)
| Some(
peer @ Peer {
state: PeerState::ConnectionInfoPreparing { .. },
..
},
)
| Some(
peer @ Peer {
state: PeerState::CrustConnecting,
..
},
)
| Some(
peer @ Peer {
state: PeerState::Connected(_),
..
},
) => {
self.insert_peer(peer);
Ok(ConnectionInfoReceivedResult::Waiting)
}
Some(
peer @ Peer {
state: PeerState::Client { .. },
..
},
) => {
self.insert_peer(peer);
Ok(ConnectionInfoReceivedResult::IsClient)
}
Some(
peer @ Peer {
state: PeerState::JoiningNode,
..
},
) => {
self.insert_peer(peer);
Ok(ConnectionInfoReceivedResult::IsJoiningNode)
}
Some(
peer @ Peer {
state: PeerState::Proxy,
..
},
) => {
self.insert_peer(peer);
Ok(ConnectionInfoReceivedResult::IsProxy)
}
Some(
peer @ Peer {
state: PeerState::Routing(_),
..
},
)
| Some(
peer @ Peer {
state: PeerState::Candidate(_),
..
},
) => {
self.insert_peer(peer);
Ok(ConnectionInfoReceivedResult::IsConnected)
}
x => {
if !is_conn_info_req {
let _ = x.map(|peer| self.insert_peer(peer));
return Ok(ConnectionInfoReceivedResult::Waiting);
}
let (valid, reconnecting) = x.map_or((false, ReconnectingPeer::False), |peer| {
(peer.valid, peer.reconnecting)
});
let state = PeerState::ConnectionInfoPreparing {
us_as_src: dst,
them_as_dst: src,
their_info: Some((peer_info, msg_id)),
};
self.insert_peer(Peer::new(pub_id, state, valid, reconnecting));
let token = rand::random();
let _ = self.connection_token_map.insert(token, pub_id);
Ok(ConnectionInfoReceivedResult::Prepare(token))
}
}
}
pub fn get_connection_token(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
pub_id: PublicId,
reconnecting_in: ReconnectingPeer,
) -> Option<u32> {
let reconnecting = match self.get_peer(&pub_id) {
Some(peer) => match *peer.state() {
PeerState::SearchingForTunnel => peer.reconnecting,
_ => return None,
},
None => reconnecting_in,
};
let token = rand::random();
let _ = self.connection_token_map.insert(token, pub_id);
self.insert_peer(Peer::new(
pub_id,
PeerState::ConnectionInfoPreparing {
us_as_src: src,
them_as_dst: dst,
their_info: None,
},
true,
reconnecting,
));
Some(token)
}
pub fn get_new_connection_info_token(&mut self, token: u32) -> Result<u32, Error> {
let pub_id = self
.connection_token_map
.remove(&token)
.ok_or(Error::PeerNotFound)?;
let new_token = rand::random();
let _ = self.connection_token_map.insert(new_token, pub_id);
Ok(new_token)
}
pub fn peers_needing_tunnel(&self) -> Vec<PublicId> {
self.peers
.values()
.filter_map(|peer| match peer.state {
PeerState::SearchingForTunnel => Some(peer.pub_id),
_ => None,
}).collect()
}
pub fn allow_connect(&self, name: &XorName) -> Result<(), RoutingTableError> {
self.routing_table.need_to_add(name)
}
pub fn insert_peer(&mut self, peer: Peer) {
let _ = self.peers.insert(peer.pub_id, peer);
}
pub fn remove_peer(
&mut self,
pub_id: &PublicId,
) -> Option<(Peer, Result<RemovalDetails<XorName>, RoutingTableError>)> {
let remove_candidate = match self.candidate {
Candidate::None => false,
Candidate::Expecting { ref old_pub_id, .. }
| Candidate::AcceptedForResourceProof { ref old_pub_id, .. } => {
old_pub_id == pub_id && self.candidate.is_expired()
}
Candidate::ResourceProof { new_pub_id, .. } => new_pub_id == *pub_id,
};
if remove_candidate {
self.candidate = Candidate::None;
}
if let Some(peer) = self.peers.remove(pub_id) {
let removal_details = self.routing_table.remove(peer.name());
Some((peer, removal_details))
} else {
None
}
}
fn remove_split_peers(&mut self, ids: Vec<PublicId>) -> Vec<PublicId> {
{
let filtered_peers = self.peers.values_mut().filter(|peer| !peer.is_routing());
for peer in filtered_peers {
if self.routing_table.need_to_add(peer.pub_id.name()).is_err() {
peer.valid = false;
}
}
}
ids.iter()
.filter_map(|id| {
let mut peer = match self.remove_peer(id) {
Some((peer, Ok(_))) => {
log_or_panic!(
Level::Error,
"{:?} RT split peer has returned removal detail.",
self
);
peer
}
Some((peer, Err(RoutingTableError::NoSuchPeer))) => peer,
_ => return None,
};
match peer {
Peer {
state: PeerState::Routing(RoutingConnection::JoiningNode(_)),
..
}
| Peer {
state: PeerState::Candidate(RoutingConnection::JoiningNode(_)),
..
} => {
debug!(
"{:?} Still the Proxy of {}, re-insert peer as JoiningNode",
self,
id.name()
);
peer.state = PeerState::JoiningNode;
self.insert_peer(peer);
None
}
Peer {
state: PeerState::Routing(RoutingConnection::Proxy(_)),
..
} => {
debug!(
"{:?} Still the JoiningNode of {}, re-insert peer as Proxy",
self,
id.name()
);
peer.state = PeerState::Proxy;
self.insert_peer(peer);
None
}
Peer { pub_id, .. } => Some(pub_id),
}
}).collect()
}
pub fn ideal_rt(&self) -> SectionMap {
let versioned_prefixes = self
.routing_table
.all_sections()
.into_iter()
.map(|(prefix, (v, _))| prefix.with_version(v))
.collect_vec();
let mut result = SectionMap::new();
for pub_id in self
.peers
.values()
.filter(|peer| peer.valid)
.map(|peer| peer.pub_id())
.chain(iter::once(&self.our_public_id))
{
if let Some(versioned_prefix) = versioned_prefixes
.iter()
.find(|versioned_prefix| versioned_prefix.prefix().matches(pub_id.name()))
{
let _ = result
.entry(*versioned_prefix)
.or_insert_with(BTreeSet::new)
.insert(*pub_id);
}
}
result
}
#[cfg(feature = "use-mock-crust")]
pub fn has_unnormalised_routing_conn(&self, excludes: &BTreeSet<XorName>) -> bool {
let unnormalised_routing_conns: BTreeSet<XorName> = self
.routing_table
.our_section()
.iter()
.filter(|name| {
if let Some(peer) = self.get_peer_by_name(name) {
match peer.state {
PeerState::Routing(RoutingConnection::JoiningNode(_))
| PeerState::Routing(RoutingConnection::Proxy(_)) => {
return true;
}
_ => return false,
}
} else {
false
}
}).cloned()
.collect();
!(&unnormalised_routing_conns - excludes).is_empty()
}
}
impl fmt::Debug for PeerManager {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(
formatter,
"Node({}({:b}))",
self.routing_table.our_name(),
self.routing_table.our_prefix()
)
}
}
#[cfg(all(test, feature = "use-mock-crust"))]
mod tests {
use super::*;
use id::FullId;
use mock_crust::crust::{PrivConnectionInfo, PubConnectionInfo};
use mock_crust::Endpoint;
use routing_table::Authority;
use types::MessageId;
use xor_name::{XorName, XOR_NAME_LEN};
fn node_auth(byte: u8) -> Authority<XorName> {
Authority::ManagedNode(XorName([byte; XOR_NAME_LEN]))
}
#[test]
pub fn connection_info_prepare_receive() {
let min_section_size = 8;
let our_pub_id = *FullId::new().public_id();
let their_pub_id = *FullId::new().public_id();
let mut peer_mgr = PeerManager::new(min_section_size, our_pub_id, false);
let our_connection_info = PrivConnectionInfo {
id: our_pub_id,
endpoint: Endpoint(0),
};
let their_connection_info = PubConnectionInfo {
id: their_pub_id,
endpoint: Endpoint(1),
};
let token = unwrap!(peer_mgr.get_connection_token(
node_auth(0),
node_auth(1),
their_pub_id,
ReconnectingPeer::False,
));
match peer_mgr.connection_info_prepared(token, our_connection_info.clone()) {
Ok(ConnectionInfoPreparedResult {
pub_id,
src,
dst,
infos: None,
}) => {
assert_eq!(their_pub_id, pub_id);
assert_eq!(node_auth(0), src);
assert_eq!(node_auth(1), dst);
}
result => panic!("Unexpected result: {:?}", result),
}
match peer_mgr.connection_info_received(
node_auth(0),
node_auth(1),
their_connection_info.clone(),
MessageId::new(),
false,
) {
Ok(ConnectionInfoReceivedResult::Ready(our_info, their_info)) => {
assert_eq!(our_connection_info, our_info);
assert_eq!(their_connection_info, their_info);
}
result => panic!("Unexpected result: {:?}", result),
}
match peer_mgr.get_peer(&their_pub_id).map(Peer::state) {
Some(&PeerState::CrustConnecting) => (),
state => panic!("Unexpected state: {:?}", state),
}
}
#[test]
pub fn connection_info_receive_prepare() {
let min_section_size = 8;
let our_pub_id = *FullId::new().public_id();
let their_pub_id = *FullId::new().public_id();
let mut peer_mgr = PeerManager::new(min_section_size, our_pub_id, false);
let our_connection_info = PrivConnectionInfo {
id: our_pub_id,
endpoint: Endpoint(0),
};
let their_connection_info = PubConnectionInfo {
id: their_pub_id,
endpoint: Endpoint(1),
};
let original_msg_id = MessageId::new();
let token = match peer_mgr.connection_info_received(
node_auth(0),
node_auth(1),
their_connection_info.clone(),
original_msg_id,
true,
) {
Ok(ConnectionInfoReceivedResult::Prepare(token)) => token,
result => panic!("Unexpected result: {:?}", result),
};
match peer_mgr.connection_info_prepared(token, our_connection_info.clone()) {
Ok(ConnectionInfoPreparedResult {
pub_id,
src,
dst,
infos: Some((our_info, their_info, msg_id)),
}) => {
assert_eq!(their_pub_id, pub_id);
assert_eq!(node_auth(1), src);
assert_eq!(node_auth(0), dst);
assert_eq!(our_connection_info, our_info);
assert_eq!(their_connection_info, their_info);
assert_eq!(original_msg_id, msg_id);
}
result => panic!("Unexpected result: {:?}", result),
}
match peer_mgr.get_peer(&their_pub_id).map(Peer::state) {
Some(&PeerState::CrustConnecting) => (),
state => panic!("Unexpected state: {:?}", state),
}
}
}