use crust::{PeerId, PrivConnectionInfo, PubConnectionInfo};
use error::RoutingError;
use id::PublicId;
use itertools::Itertools;
use rand;
use resource_proof::ResourceProof;
use routing_table::{Authority, OtherMergeDetails, OwnMergeDetails, OwnMergeState, Prefix,
RemovalDetails, RoutingTable};
use routing_table::Error as RoutingTableError;
use rust_sodium::crypto::hash::sha256;
use rust_sodium::crypto::sign;
use signature_accumulator::ACCUMULATION_TIMEOUT_SECS;
use std::{error, fmt, mem};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
use std::collections::hash_map::Values;
use std::time::{Duration, Instant};
use types::MessageId;
use xor_name::XorName;
const JOINING_NODE_TIMEOUT_SECS: u64 = 900;
const CONNECTION_TIMEOUT_SECS: u64 = 90;
const NODE_IDENTIFY_TIMEOUT_SECS: u64 = 60;
pub const RESOURCE_PROOF_DURATION_SECS: u64 = 300;
const CANDIDATE_ACCEPT_TIMEOUT_SECS: u64 = 60;
const NODE_CONNECT_TIMEOUT_SECS: u64 = 60;
pub type SectionMap = BTreeMap<Prefix<XorName>, BTreeSet<PublicId>>;
#[derive(Debug)]
pub enum Error {
PeerNotFound,
UnexpectedState,
}
impl fmt::Display for Error {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::PeerNotFound => write!(formatter, "Peer not found"),
Error::UnexpectedState => write!(formatter, "Peer state does not allow operation"),
}
}
}
impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::PeerNotFound => "Peer not found",
Error::UnexpectedState => "Peer state does not allow operation",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum RoutingConnection {
JoiningNode(Instant),
Proxy(Instant),
Direct,
Tunnel,
}
#[derive(Debug)]
#[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))]
pub enum PeerState {
ConnectionInfoPreparing {
us_as_src: Authority<XorName>,
them_as_dst: Authority<XorName>,
their_info: Option<(PubConnectionInfo, MessageId)>,
},
ConnectionInfoReady(PrivConnectionInfo),
CrustConnecting,
SearchingForTunnel,
AwaitingNodeIdentify(bool),
Client,
JoiningNode,
Routing(RoutingConnection),
Candidate(RoutingConnection),
Proxy,
}
impl PeerState {
pub fn can_tunnel_for(&self) -> bool {
match *self {
PeerState::Routing(RoutingConnection::Direct) |
PeerState::Candidate(RoutingConnection::Direct) => true,
_ => false,
}
}
}
#[derive(Debug)]
pub enum ConnectionInfoReceivedResult {
Ready(PrivConnectionInfo, PubConnectionInfo),
Prepare(u32),
Waiting,
IsProxy,
IsClient,
IsJoiningNode,
IsConnected,
}
#[derive(Debug)]
pub struct ConnectionInfoPreparedResult {
pub pub_id: PublicId,
pub src: Authority<XorName>,
pub dst: Authority<XorName>,
pub infos: Option<(PrivConnectionInfo, PubConnectionInfo, MessageId)>,
}
pub struct Peer {
pub_id: PublicId,
peer_id: Option<PeerId>,
state: PeerState,
timestamp: Instant,
}
impl Peer {
fn new(pub_id: PublicId, peer_id: Option<PeerId>, state: PeerState) -> Self {
Peer {
pub_id: pub_id,
peer_id: peer_id,
state: state,
timestamp: Instant::now(),
}
}
pub fn peer_id(&self) -> Option<&PeerId> {
self.peer_id.as_ref()
}
pub fn pub_id(&self) -> &PublicId {
&self.pub_id
}
pub fn name(&self) -> &XorName {
self.pub_id.name()
}
pub fn state(&self) -> &PeerState {
&self.state
}
fn is_expired(&self) -> bool {
match self.state {
PeerState::ConnectionInfoPreparing { .. } |
PeerState::ConnectionInfoReady(_) |
PeerState::CrustConnecting |
PeerState::SearchingForTunnel => {
self.timestamp.elapsed() >= Duration::from_secs(CONNECTION_TIMEOUT_SECS)
}
PeerState::JoiningNode |
PeerState::Proxy |
PeerState::Candidate(_) |
PeerState::Client |
PeerState::Routing(_) |
PeerState::AwaitingNodeIdentify(_) => false,
}
}
fn to_routing_connection(&self) -> RoutingConnection {
match self.state {
PeerState::SearchingForTunnel |
PeerState::AwaitingNodeIdentify(true) => RoutingConnection::Tunnel,
PeerState::Candidate(conn) |
PeerState::Routing(conn) => conn,
PeerState::Proxy => RoutingConnection::Proxy(self.timestamp),
PeerState::JoiningNode => RoutingConnection::JoiningNode(self.timestamp),
PeerState::AwaitingNodeIdentify(false) |
PeerState::ConnectionInfoPreparing { .. } |
PeerState::ConnectionInfoReady(_) |
PeerState::Client |
PeerState::CrustConnecting => RoutingConnection::Direct,
}
}
}
struct PeerMap {
peers: HashMap<XorName, Peer>,
names: HashMap<PeerId, XorName>,
}
impl PeerMap {
fn new() -> Self {
PeerMap {
peers: HashMap::new(),
names: HashMap::new(),
}
}
fn get(&self, peer_id: &PeerId) -> Option<&Peer> {
if let Some(name) = self.names.get(peer_id) {
self.peers.get(name)
} else {
None
}
}
fn get_mut(&mut self, peer_id: &PeerId) -> Option<&mut Peer> {
if let Some(name) = self.names.get(peer_id) {
self.peers.get_mut(name)
} else {
None
}
}
fn get_by_name(&self, name: &XorName) -> Option<&Peer> {
self.peers.get(name)
}
fn peers(&self) -> Values<XorName, Peer> {
self.peers.values()
}
fn insert(&mut self, peer: Peer) -> Option<Peer> {
let old_peer = peer.peer_id
.and_then(|peer_id| self.names.insert(peer_id, *peer.name()))
.and_then(|old_name| self.peers.remove(&old_name));
self.peers.insert(*peer.name(), peer).or(old_peer)
}
fn remove(&mut self, peer_id: &PeerId) -> Option<Peer> {
if let Some(name) = self.names.remove(peer_id) {
self.peers.remove(&name)
} else {
None
}
}
fn remove_by_name(&mut self, name: &XorName) -> Option<Peer> {
if let Some(peer) = self.peers.remove(name) {
if let Some(peer_id) = peer.peer_id {
let _ = self.names.remove(&peer_id);
}
Some(peer)
} else {
None
}
}
}
#[derive(Debug)]
enum CandidateState {
VotedFor,
AcceptedAsCandidate,
Approved,
}
#[derive(Debug)]
struct ChallengeResponse {
target_size: usize,
difficulty: u8,
seed: Vec<u8>,
proof: VecDeque<u8>,
}
#[derive(Debug)]
struct Candidate {
insertion_time: Instant,
challenge_response: Option<ChallengeResponse>,
client_auth: Authority<XorName>,
state: CandidateState,
passed_our_challenge: bool,
}
impl Candidate {
fn new(client_auth: Authority<XorName>) -> Candidate {
Candidate {
insertion_time: Instant::now(),
challenge_response: None,
client_auth: client_auth,
state: CandidateState::VotedFor,
passed_our_challenge: false,
}
}
fn is_expired(&self) -> bool {
let timeout_duration = match self.state {
CandidateState::VotedFor => Duration::from_secs(CANDIDATE_ACCEPT_TIMEOUT_SECS),
CandidateState::AcceptedAsCandidate |
CandidateState::Approved => {
Duration::from_secs(RESOURCE_PROOF_DURATION_SECS + ACCUMULATION_TIMEOUT_SECS)
}
};
self.insertion_time.elapsed() > timeout_duration
}
fn is_approved(&self) -> bool {
match self.state {
CandidateState::VotedFor |
CandidateState::AcceptedAsCandidate => false,
CandidateState::Approved => true,
}
}
}
pub struct PeerManager {
connection_token_map: HashMap<u32, PublicId>,
peer_map: PeerMap,
unknown_peers: HashMap<PeerId, (bool, Instant)>,
expected_peers: HashMap<XorName, Instant>,
proxy_peer_id: Option<PeerId>,
routing_table: RoutingTable<XorName>,
our_public_id: PublicId,
candidates: HashMap<XorName, Candidate>,
}
impl PeerManager {
pub fn new(min_section_size: usize, our_public_id: PublicId) -> PeerManager {
PeerManager {
connection_token_map: HashMap::new(),
peer_map: PeerMap::new(),
unknown_peers: HashMap::new(),
expected_peers: HashMap::new(),
proxy_peer_id: None,
routing_table: RoutingTable::<XorName>::new(*our_public_id.name(), min_section_size),
our_public_id: our_public_id,
candidates: HashMap::new(),
}
}
pub fn reset_routing_table(&mut self, our_public_id: PublicId) {
if !self.routing_table.is_empty() {
warn!("{:?} Reset to {:?} from non-empty routing table {:?}.",
self,
our_public_id.name(),
self.routing_table)
}
let min_section_size = self.routing_table.min_section_size();
self.our_public_id = our_public_id;
let new_rt = RoutingTable::new(*our_public_id.name(), min_section_size);
self.routing_table = new_rt;
}
pub fn add_prefixes(&mut self, prefixes: Vec<Prefix<XorName>>) -> Result<(), RoutingError> {
Ok(self.routing_table.add_prefixes(prefixes)?)
}
pub fn routing_table(&self) -> &RoutingTable<XorName> {
&self.routing_table
}
pub fn expect_peer(&mut self, id: &PublicId) {
let _ = self.expected_peers.insert(*id.name(), Instant::now());
}
pub fn expect_candidate(&mut self,
candidate_name: XorName,
client_auth: Authority<XorName>)
-> Result<(), RoutingError> {
if let Some((ongoing_name, _)) =
self.candidates.iter().find(|&(_, candidate)| !candidate.is_approved()) {
trace!("{:?} Rejected {} as a new candidate: still handling attempt by {}.",
self,
candidate_name,
ongoing_name);
return Err(RoutingError::AlreadyHandlingJoinRequest);
}
self.routing_table.should_join_our_section(&candidate_name)?;
let _ = self.candidates.insert(candidate_name, Candidate::new(client_auth));
Ok(())
}
pub fn accept_as_candidate(&mut self,
candidate_name: XorName,
client_auth: Authority<XorName>)
-> BTreeSet<PublicId> {
self.remove_unapproved_candidates(&candidate_name);
self.candidates
.entry(candidate_name)
.or_insert_with(|| Candidate::new(client_auth))
.state = CandidateState::AcceptedAsCandidate;
let our_section = self.routing_table.our_section();
self.get_pub_ids(our_section)
}
pub fn verify_candidate(&mut self,
candidate_name: &XorName,
part_index: usize,
part_count: usize,
proof_part: Vec<u8>,
leading_zero_bytes: u64)
-> Result<Option<(usize, u8, Duration)>, RoutingError> {
let candidate = if let Some(candidate) = self.candidates.get_mut(candidate_name) {
candidate
} else {
return Err(RoutingError::UnknownCandidate);
};
let challenge_response = &mut (if let Some(ref mut rp) = candidate.challenge_response {
rp
} else {
return Err(RoutingError::FailedResourceProofValidation);
});
challenge_response.proof.extend(proof_part);
if part_index + 1 != part_count {
return Ok(None);
}
let rp_object = ResourceProof::new(challenge_response.target_size,
challenge_response.difficulty);
if rp_object.validate_all(&challenge_response.seed,
&challenge_response.proof,
leading_zero_bytes) {
candidate.passed_our_challenge = true;
Ok(Some((challenge_response.target_size,
challenge_response.difficulty,
candidate.insertion_time.elapsed())))
} else {
Err(RoutingError::FailedResourceProofValidation)
}
}
pub fn verified_candidate_info
(&self)
-> Result<(PublicId, Authority<XorName>, SectionMap), RoutingError> {
if let Some((name, candidate)) =
self.candidates.iter().find(|&(_, cand)| {
cand.passed_our_challenge && !cand.is_approved()
}) {
return if let Some(peer) = self.peer_map.get_by_name(name) {
Ok((*peer.pub_id(), candidate.client_auth, self.pub_ids_by_section()))
} else {
Err(RoutingError::UnknownCandidate)
};
}
if let Some((name, _)) = self.candidates.iter().find(|&(_, cand)| !cand.is_approved()) {
info!("{:?} Candidate {} has not passed our resource proof challenge in time. Not \
sending approval vote to our section with {:?}",
self,
name,
self.routing_table.our_prefix());
}
Err(RoutingError::UnknownCandidate)
}
pub fn handle_candidate_approval(&mut self,
candidate_name: XorName,
client_auth: Authority<XorName>)
-> Result<Option<PeerId>, RoutingError> {
if let Some(candidate) = self.candidates.get_mut(&candidate_name) {
candidate.state = CandidateState::Approved;
if let Some(peer) = self.peer_map.get_by_name(&candidate_name) {
if let Some(peer_id) = peer.peer_id() {
if let PeerState::Candidate(_) = *peer.state() {
return Ok(Some(*peer_id));
} else {
trace!("Node({:?}) Candidate {:?} not yet connected to us.",
self.routing_table.our_name(),
candidate_name);
return Ok(None);
};
} else {
trace!("Node({:?}) No peer ID with name {:?}",
self.routing_table.our_name(),
candidate_name);
}
} else {
trace!("Node({:?}) No peer with name {:?}",
self.routing_table.our_name(),
candidate_name);
}
return Err(RoutingError::InvalidStateForOperation);
}
self.remove_unapproved_candidates(&candidate_name);
let mut candidate = Candidate::new(client_auth);
candidate.state = CandidateState::Approved;
let _ = self.candidates.insert(candidate_name, candidate);
trace!("{:?} No candidate with name {:?}", self, candidate_name);
Err(RoutingError::InvalidStateForOperation)
}
pub fn handle_candidate_identify(&mut self,
pub_id: &PublicId,
peer_id: &PeerId,
target_size: usize,
difficulty: u8,
seed: Vec<u8>)
-> Result<bool, RoutingError> {
if let Some(candidate) = self.candidates.get_mut(pub_id.name()) {
if candidate.is_approved() {
Ok(false)
} else {
let conn =
self.peer_map.get(peer_id).map_or(RoutingConnection::Direct,
Peer::to_routing_connection);
let state = PeerState::Candidate(conn);
let _ = self.peer_map.insert(Peer::new(*pub_id, Some(*peer_id), state));
if conn == RoutingConnection::Tunnel {
Err(RoutingError::CandidateIsTunnelling)
} else {
candidate.challenge_response = Some(ChallengeResponse {
target_size: target_size,
difficulty: difficulty,
seed: seed,
proof: VecDeque::new(),
});
Ok(true)
}
}
} else {
Err(RoutingError::UnknownCandidate)
}
}
pub fn show_candidate_status(&self) {
let mut have_candidate = false;
let log_prefix = format!("{:?} Candidate Status - ", self);
for (name, candidate) in self.candidates.iter().filter(|&(_, cand)| !cand.is_expired()) {
have_candidate = true;
let mut log_msg = format!("{}{} ", log_prefix, name);
match candidate.challenge_response {
Some(ChallengeResponse { ref target_size, ref proof, .. }) => {
if candidate.passed_our_challenge {
log_msg = format!("{}has passed our challenge ", log_msg);
} else if proof.is_empty() {
log_msg = format!("{}hasn't responded to our challenge yet ", log_msg);
} else {
log_msg = format!("{}has sent {}% of resource proof ",
log_msg,
(proof.len() * 100) / target_size);
}
if candidate.is_approved() {
log_msg = format!("{}and is approved by our section.", log_msg);
} else {
log_msg = format!("{}and is not yet approved by our section.", log_msg);
}
}
None => {
log_msg = format!("{}has not sent CandidateIdentify yet.", log_msg);
}
}
trace!("{}", log_msg);
}
if have_candidate {
return;
}
trace!("{}No candidate is currently being handled.", log_prefix);
}
pub fn add_to_routing_table(&mut self,
pub_id: &PublicId,
peer_id: &PeerId,
want_to_merge: bool)
-> Result<bool, RoutingTableError> {
if let Some(peer) = self.peer_map.get(peer_id) {
match peer.state {
PeerState::ConnectionInfoPreparing { .. } |
PeerState::ConnectionInfoReady(_) |
PeerState::SearchingForTunnel |
PeerState::Routing(RoutingConnection::Proxy(_)) |
PeerState::Routing(RoutingConnection::Direct) |
PeerState::Routing(RoutingConnection::Tunnel) => {
trace!("{:?} Unexpected peer state {:?} while adding {:?} into routing table.",
self,
peer.state,
peer_id)
}
PeerState::CrustConnecting if !self.unknown_peers.contains_key(peer_id) => {
trace!("{:?} Unexpected peer state {:?} while adding {:?} into routing table.",
self,
peer.state,
peer_id)
}
PeerState::CrustConnecting |
PeerState::AwaitingNodeIdentify(_) |
PeerState::Client |
PeerState::JoiningNode |
PeerState::Routing(RoutingConnection::JoiningNode(_)) |
PeerState::Candidate(_) |
PeerState::Proxy => (),
}
} else if !self.unknown_peers.contains_key(peer_id) {
trace!("{:?} Add to routing table called for {:?} not found in peer_map/unknown_peers",
self,
peer_id);
}
let unknown_connection = if let Some((is_tunnel, _)) = self.unknown_peers.remove(peer_id) {
if is_tunnel {
RoutingConnection::Tunnel
} else {
if let Some(peer @ &mut Peer {
state: PeerState::Routing(RoutingConnection::Tunnel), .. }) =
self.peer_map.get_mut(peer_id) {
peer.state = PeerState::Routing(RoutingConnection::Direct);
}
RoutingConnection::Direct
}
} else {
RoutingConnection::Direct
};
let _ = self.expected_peers.remove(pub_id.name());
let should_split = self.routing_table.add(*pub_id.name(), want_to_merge)?;
let conn =
self.peer_map.remove(peer_id).map_or(unknown_connection,
|peer| peer.to_routing_connection());
let _ = self.peer_map.insert(Peer::new(*pub_id, Some(*peer_id), PeerState::Routing(conn)));
Ok(should_split)
}
pub fn split_section(&mut self,
prefix: Prefix<XorName>)
-> (Vec<(XorName, PeerId)>, Option<Prefix<XorName>>) {
let (names_to_drop, our_new_prefix) = self.routing_table.split(prefix);
for name in &names_to_drop {
info!("{:?} Dropped {:?} from the routing table.", self, name);
}
let removal_keys = self.candidates
.iter()
.find(|&(name, candidate)| {
!candidate.is_approved() && !self.routing_table.our_prefix().matches(name)
})
.map(|(name, _)| *name);
let ids_to_drop = names_to_drop.iter()
.chain(removal_keys.iter())
.filter_map(|name| {
self.peer_map.remove_by_name(name).and_then(|peer| match peer {
Peer {
state: PeerState::Routing(RoutingConnection::JoiningNode(timestamp)),
..
} |
Peer {
state: PeerState::Candidate(RoutingConnection::JoiningNode(timestamp)),
..
} => {
debug!("{:?} still acts as proxy of {:?}, re-insert peer as JoiningNode",
self,
name);
let _ = self.peer_map.insert(Peer {
timestamp: timestamp,
state: PeerState::JoiningNode,
..peer
});
None
}
Peer { state: PeerState::Routing(RoutingConnection::Proxy(timestamp)), .. } => {
let _ = self.peer_map.insert(Peer {
timestamp: timestamp,
state: PeerState::Proxy,
..peer
});
None
}
Peer { peer_id: Some(id), .. } => Some((*name, id)),
Peer { peer_id: None, .. } => None,
})
})
.collect_vec();
self.cleanup_proxy_peer_id();
for name in removal_keys.iter() {
let _ = self.candidates.remove(name);
trace!("{:?} Removed unapproved candidate {:?} after split.",
self,
name);
}
let old_expected_peers = mem::replace(&mut self.expected_peers, HashMap::new());
self.expected_peers = old_expected_peers.into_iter()
.filter(|&(ref name, _)| self.routing_table.need_to_add(name) == Ok(()))
.collect();
(ids_to_drop, our_new_prefix)
}
pub fn add_prefix(&mut self, prefix: Prefix<XorName>) -> Vec<(XorName, PeerId)> {
let names_to_drop = self.routing_table.add_prefix(prefix);
let old_expected_peers = mem::replace(&mut self.expected_peers, HashMap::new());
self.expected_peers = old_expected_peers.into_iter()
.filter(|&(ref name, _)| self.routing_table.need_to_add(name) == Ok(()))
.collect();
names_to_drop.into_iter()
.filter_map(|name| if let Some(peer) = self.peer_map.remove_by_name(&name) {
self.cleanup_proxy_peer_id();
peer.peer_id.map(|peer_id| (name, peer_id))
} else {
None
})
.collect()
}
pub fn should_merge(&self,
we_want_to_merge: bool,
they_want_to_merge: bool)
-> Option<(Prefix<XorName>, Prefix<XorName>, SectionMap)> {
if !they_want_to_merge && !self.expected_peers.is_empty() {
return None;
}
self.routing_table
.should_merge(we_want_to_merge, they_want_to_merge)
.map(|merge_details| {
let sections = merge_details.sections
.into_iter()
.map(|(prefix, members)| {
(prefix, self.get_pub_ids(&members).into_iter().collect())
})
.collect();
(merge_details.sender_prefix, merge_details.merge_prefix, sections)
})
}
pub fn merge_own_section(&mut self,
sender_prefix: Prefix<XorName>,
merge_prefix: Prefix<XorName>,
sections: SectionMap)
-> (OwnMergeState<XorName>, Vec<PublicId>) {
self.remove_expired();
let needed = sections.iter()
.flat_map(|(_, pub_ids)| pub_ids)
.filter(|pub_id| !self.routing_table.has(pub_id.name()))
.cloned()
.collect();
let sections_as_names = sections.into_iter()
.map(|(prefix, members)| {
(prefix,
members.into_iter().map(|pub_id| *pub_id.name()).collect::<BTreeSet<_>>())
})
.collect();
let own_merge_details = OwnMergeDetails {
sender_prefix: sender_prefix,
merge_prefix: merge_prefix,
sections: sections_as_names,
};
let mut expected_peers = mem::replace(&mut self.expected_peers, HashMap::new());
expected_peers.extend(own_merge_details.sections
.values()
.flat_map(|section| section.iter())
.filter_map(|name| if self.routing_table.has(name) {
None
} else {
Some((*name, Instant::now()))
}));
self.expected_peers = expected_peers;
(self.routing_table.merge_own_section(own_merge_details), needed)
}
pub fn merge_other_section(&mut self,
prefix: Prefix<XorName>,
section: BTreeSet<PublicId>)
-> HashSet<PublicId> {
self.remove_expired();
let merge_details = OtherMergeDetails {
prefix: prefix,
section: section.iter().map(|public_id| *public_id.name()).collect(),
};
let needed_names = self.routing_table.merge_other_section(merge_details);
self.expected_peers.extend(needed_names.iter().map(|name| (*name, Instant::now())));
section.into_iter().filter(|pub_id| needed_names.contains(pub_id.name())).collect()
}
pub fn can_tunnel_for(&self, peer_id: &PeerId, dst_id: &PeerId) -> bool {
let peer_state = self.get_state(peer_id);
let dst_state = self.get_state(dst_id);
match (peer_state, dst_state) {
(Some(peer1), Some(peer2)) => peer1.can_tunnel_for() && peer2.can_tunnel_for(),
_ => false,
}
}
pub fn get_routing_peer(&self, peer_id: &PeerId) -> Option<&PublicId> {
self.peer_map.get(peer_id).and_then(|peer| if let PeerState::Routing(_) = peer.state {
Some(&peer.pub_id)
} else {
None
})
}
pub fn proxy(&self) -> Option<(&PeerId, &PublicId)> {
if let Some(peer_id) = self.proxy_peer_id.as_ref() {
if let Some(peer) = self.peer_map.get(peer_id) {
return Some((peer_id, &peer.pub_id));
}
}
None
}
pub fn get_proxy_public_id(&self, peer_id: &PeerId) -> Option<&PublicId> {
if Some(*peer_id) == self.proxy_peer_id {
self.peer_map.get(peer_id).map(Peer::pub_id)
} else {
None
}
}
pub fn get_proxy_peer_id(&self, name: &XorName) -> Option<&PeerId> {
if let Some(ref peer_id) = self.proxy_peer_id {
if self.peer_map.get(peer_id).map(Peer::name) == Some(name) {
return Some(peer_id);
}
}
None
}
pub fn set_proxy(&mut self, peer_id: PeerId, pub_id: PublicId) -> bool {
if let Some(proxy_peer_id) = self.proxy_peer_id {
debug!("{:?} Not accepting further bootstrap connections.", self);
proxy_peer_id == peer_id
} else {
let _ = self.insert_peer(pub_id, Some(peer_id), PeerState::Proxy);
self.proxy_peer_id = Some(peer_id);
true
}
}
pub fn insert_client(&mut self, peer_id: PeerId, pub_id: PublicId) -> bool {
self.insert_peer(pub_id, Some(peer_id), PeerState::Client)
}
pub fn get_client(&self, peer_id: &PeerId) -> Option<&sign::PublicKey> {
self.peer_map.get(peer_id).and_then(|peer| match peer.state {
PeerState::Client => {
Some(peer.pub_id.signing_public_key())
}
_ => None,
})
}
pub fn insert_joining_node(&mut self, peer_id: PeerId, pub_id: PublicId) -> bool {
self.insert_peer(pub_id, Some(peer_id), PeerState::JoiningNode)
}
pub fn get_joining_node(&self, peer_id: &PeerId) -> Option<&sign::PublicKey> {
self.peer_map.get(peer_id).and_then(|peer| match peer.state {
PeerState::JoiningNode => {
Some(peer.pub_id.signing_public_key())
}
_ => None,
})
}
pub fn remove_expired_joining_nodes(&mut self) -> Vec<PeerId> {
let expired_ids = self.peer_map
.peers()
.filter(|peer| match peer.state {
PeerState::JoiningNode | PeerState::Proxy => {
peer.timestamp.elapsed() >=
Duration::from_secs(JOINING_NODE_TIMEOUT_SECS)
}
_ => false,
})
.filter_map(|peer| peer.peer_id)
.collect_vec();
for peer_id in &expired_ids {
let _ = self.remove_peer(peer_id);
}
self.cleanup_proxy_peer_id();
expired_ids
}
pub fn remove_expired_connections(&mut self) -> Vec<PeerId> {
self.remove_expired();
let mut expired_connections = Vec::new();
for (peer_id, xor_name) in &self.peer_map.names {
if let Some(peer) = self.peer_map.peers.get(xor_name) {
if let PeerState::AwaitingNodeIdentify(_) = peer.state {
if peer.timestamp.elapsed() >= Duration::from_secs(NODE_IDENTIFY_TIMEOUT_SECS) {
expired_connections.push(*peer_id);
}
}
}
}
for peer_id in &expired_connections {
let _ = self.peer_map.remove(peer_id);
}
let mut expired_unknown_peers = Vec::new();
for (peer_id, &(_, timestamp)) in &self.unknown_peers {
if timestamp.elapsed() >= Duration::from_secs(NODE_IDENTIFY_TIMEOUT_SECS) {
expired_unknown_peers.push(*peer_id);
}
}
for peer_id in expired_unknown_peers {
expired_connections.push(peer_id);
let _ = self.unknown_peers.remove(&peer_id);
}
let mut expired_expected = Vec::new();
for (name, timestamp) in &self.expected_peers {
if timestamp.elapsed() >= Duration::from_secs(NODE_CONNECT_TIMEOUT_SECS) {
expired_expected.push(*name);
}
}
for name in expired_expected {
let _ = self.expected_peers.remove(&name);
}
expired_connections
}
pub fn get_proxy_or_client_or_joining_node_peer_id(&self, pub_id: &PublicId) -> Option<PeerId> {
if let Some(peer) = self.peer_map.get_by_name(pub_id.name()) {
match peer.state {
PeerState::Client | PeerState::JoiningNode | PeerState::Proxy => peer.peer_id,
_ => None,
}
} else if let Some(join_peer) =
self.peer_map.get_by_name(&XorName(sha256::hash(&pub_id.signing_public_key().0).0)) {
match join_peer.state {
PeerState::JoiningNode => join_peer.peer_id,
_ => None,
}
} else {
None
}
}
pub fn joining_nodes_num(&self) -> usize {
self.peer_map
.peers()
.filter(|&peer| match peer.state {
PeerState::JoiningNode => true,
_ => false,
})
.count()
}
pub fn client_num(&self) -> usize {
self.peer_map
.peers()
.filter(|&peer| match peer.state {
PeerState::Client => true,
_ => false,
})
.count()
}
pub fn connected_to(&mut self, peer_id: &PeerId) {
if !self.set_state(peer_id, PeerState::AwaitingNodeIdentify(false)) {
let _ = self.unknown_peers.insert(*peer_id, (false, Instant::now()));
}
}
pub fn tunnelling_to(&mut self, peer_id: &PeerId) -> bool {
match self.get_state(peer_id) {
Some(&PeerState::AwaitingNodeIdentify(false)) |
Some(&PeerState::Routing(_)) => {
return false;
}
_ => (),
}
if !self.set_state(peer_id, PeerState::AwaitingNodeIdentify(true)) {
let _ = self.unknown_peers.insert(*peer_id, (true, Instant::now()));
}
true
}
pub fn get_connecting_peer(&self, peer_id: &PeerId) -> Option<&PublicId> {
self.peer_map.get(peer_id).and_then(|peer| if let PeerState::CrustConnecting = peer.state {
return Some(&peer.pub_id);
} else {
None
})
}
pub fn get_peer_name(&self, peer_id: &PeerId) -> Option<&XorName> {
self.peer_map.get(peer_id).map(Peer::name)
}
pub fn get_connected_peer(&self, peer_id: &PeerId) -> Option<&Peer> {
self.peer_map.get(peer_id).and_then(|peer| match peer.state {
PeerState::Client |
PeerState::JoiningNode |
PeerState::Proxy |
PeerState::Candidate(_) |
PeerState::Routing(_) => Some(peer),
_ => None,
})
}
pub fn is_expected(&self, name: &XorName) -> bool {
self.expected_peers.contains_key(name)
}
pub fn get_peer_id(&self, name: &XorName) -> Option<&PeerId> {
self.peer_map.get_by_name(name).and_then(Peer::peer_id)
}
pub fn get_peer_ids(&self, names: &HashSet<XorName>) -> Vec<PeerId> {
names.iter()
.filter_map(|name| self.get_peer_id(name))
.cloned()
.collect()
}
pub fn get_pub_ids(&self, names: &BTreeSet<XorName>) -> BTreeSet<PublicId> {
names.into_iter()
.filter_map(|name| if name == self.our_public_id.name() {
Some(self.our_public_id)
} else if let Some(peer) = self.peer_map.get_by_name(name) {
Some(*peer.pub_id())
} else {
error!("{:?} Missing public ID for peer {:?}.", self, name);
None
})
.collect()
}
pub fn get_routing_peer_details(&self) -> Vec<(PeerId, XorName, bool)> {
self.routing_table
.iter()
.filter_map(|name| -> Option<(PeerId, XorName, bool)> {
let peer = match self.peer_map.get_by_name(name) {
Some(peer) => peer,
None => {
error!("{:?} has {} in RT, but has no entry in peer_map for it.",
self,
name);
return None;
}
};
let peer_id = match peer.peer_id {
Some(peer_id) => peer_id,
None => {
error!("{:?} has {} in RT, but has no peer ID for it.", self, name);
return None;
}
};
let is_tunnel = match peer.state {
PeerState::Routing(RoutingConnection::Tunnel) => true,
PeerState::Routing(_) => false,
_ => {
error!("{:?} has {} in RT, but has state {:?} for it.",
self,
name,
peer.state);
return None;
}
};
Some((peer_id, *name, is_tunnel))
})
.collect()
}
pub fn correct_routing_state_to_direct(&mut self, peer_id: &PeerId) {
let _ = self.set_state(peer_id, PeerState::Routing(RoutingConnection::Direct));
}
pub fn potential_tunnel_nodes(&self, client_name: &XorName) -> Vec<(XorName, PeerId)> {
self.routing_table
.iter()
.filter(|tunnel_name| self.is_potential_tunnel_node(tunnel_name, client_name))
.filter_map(|name| {
self.peer_map
.get_by_name(name)
.and_then(|peer| peer.peer_id().and_then(|peer_id| Some((*name, *peer_id))))
})
.collect()
}
pub fn is_potential_tunnel_node(&self, tunnel_name: &XorName, client_name: &XorName) -> bool {
if self.our_public_id.name() == tunnel_name || self.our_public_id.name() == client_name ||
!self.get_state_by_name(tunnel_name).map_or(false, PeerState::can_tunnel_for) {
return false;
}
let our_prefix = self.routing_table.our_prefix();
if our_prefix.matches(client_name) {
our_prefix.popped().matches(tunnel_name)
} else {
self.routing_table.find_section_prefix(tunnel_name).map_or(false, |pfx| {
pfx.matches(client_name) || pfx == *our_prefix
})
}
}
pub fn set_searching_for_tunnel(&mut self,
peer_id: PeerId,
pub_id: PublicId)
-> Vec<(XorName, PeerId)> {
match self.get_state_by_name(pub_id.name()) {
Some(&PeerState::Client) |
Some(&PeerState::JoiningNode) |
Some(&PeerState::Proxy) |
Some(&PeerState::Routing(_)) |
Some(&PeerState::AwaitingNodeIdentify(_)) => return vec![],
_ => (),
}
let _ = self.insert_peer(pub_id, Some(peer_id), PeerState::SearchingForTunnel);
self.potential_tunnel_nodes(pub_id.name())
}
pub fn connection_info_prepared(&mut self,
token: u32,
our_info: PrivConnectionInfo)
-> Result<ConnectionInfoPreparedResult, Error> {
let pub_id = self.connection_token_map
.remove(&token)
.ok_or(Error::PeerNotFound)?;
let (us_as_src, them_as_dst, opt_their_info) =
match self.peer_map.remove_by_name(pub_id.name()) {
Some(Peer { state: PeerState::ConnectionInfoPreparing { us_as_src,
them_as_dst,
their_info },
.. }) => (us_as_src, them_as_dst, their_info),
Some(peer) => {
let _ = self.peer_map.insert(peer);
return Err(Error::UnexpectedState);
}
None => return Err(Error::PeerNotFound),
};
Ok(ConnectionInfoPreparedResult {
pub_id: pub_id,
src: us_as_src,
dst: them_as_dst,
infos: match opt_their_info {
Some((their_info, msg_id)) => {
let state = PeerState::CrustConnecting;
self.insert_peer(pub_id, Some(their_info.id()), state);
Some((our_info, their_info, msg_id))
}
None => {
let state = PeerState::ConnectionInfoReady(our_info);
self.insert_peer(pub_id, None, state);
None
}
},
})
}
pub fn connection_info_received(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
pub_id: PublicId,
peer_info: PubConnectionInfo,
msg_id: MessageId)
-> Result<ConnectionInfoReceivedResult, Error> {
let peer_id = peer_info.id();
match self.peer_map.remove_by_name(pub_id.name()) {
Some(Peer { state: PeerState::ConnectionInfoReady(our_info), .. }) => {
let state = PeerState::CrustConnecting;
self.insert_peer(pub_id, Some(peer_id), state);
Ok(ConnectionInfoReceivedResult::Ready(our_info, peer_info))
}
Some(Peer { state: PeerState::ConnectionInfoPreparing { us_as_src,
them_as_dst,
their_info: None },
.. }) => {
let state = PeerState::ConnectionInfoPreparing {
us_as_src: us_as_src,
them_as_dst: them_as_dst,
their_info: Some((peer_info, msg_id)),
};
self.insert_peer(pub_id, Some(peer_id), state);
Ok(ConnectionInfoReceivedResult::Waiting)
}
Some(peer @ Peer { state: PeerState::ConnectionInfoPreparing { .. }, .. }) |
Some(peer @ Peer { state: PeerState::CrustConnecting, .. }) |
Some(peer @ Peer { state: PeerState::AwaitingNodeIdentify(_), .. }) => {
let _ = self.peer_map.insert(peer);
Ok(ConnectionInfoReceivedResult::Waiting)
}
Some(peer @ Peer { state: PeerState::Client, .. }) => {
let _ = self.peer_map.insert(peer);
Ok(ConnectionInfoReceivedResult::IsClient)
}
Some(peer @ Peer { state: PeerState::JoiningNode, .. }) => {
let _ = self.peer_map.insert(peer);
Ok(ConnectionInfoReceivedResult::IsJoiningNode)
}
Some(peer @ Peer { state: PeerState::Proxy, .. }) => {
let _ = self.peer_map.insert(peer);
Ok(ConnectionInfoReceivedResult::IsProxy)
}
Some(peer @ Peer { state: PeerState::Routing(_), .. }) |
Some(peer @ Peer { state: PeerState::Candidate(_), .. }) => {
let _ = self.peer_map.insert(peer);
Ok(ConnectionInfoReceivedResult::IsConnected)
}
Some(Peer { state: PeerState::SearchingForTunnel, .. }) |
None => {
let state = PeerState::ConnectionInfoPreparing {
us_as_src: dst,
them_as_dst: src,
their_info: Some((peer_info, msg_id)),
};
self.insert_peer(pub_id, Some(peer_id), state);
let token = rand::random();
let _ = self.connection_token_map.insert(token, pub_id);
Ok(ConnectionInfoReceivedResult::Prepare(token))
}
}
}
pub fn get_connection_token(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
pub_id: PublicId)
-> Option<u32> {
match self.get_state_by_name(pub_id.name()) {
Some(&PeerState::AwaitingNodeIdentify(_)) |
Some(&PeerState::Client) |
Some(&PeerState::ConnectionInfoPreparing { .. }) |
Some(&PeerState::ConnectionInfoReady(..)) |
Some(&PeerState::CrustConnecting) |
Some(&PeerState::JoiningNode) |
Some(&PeerState::Proxy) |
Some(&PeerState::Candidate(_)) |
Some(&PeerState::Routing(_)) => return None,
Some(&PeerState::SearchingForTunnel) |
None => (),
}
let token = rand::random();
let _ = self.connection_token_map.insert(token, pub_id);
self.insert_peer(pub_id,
None,
PeerState::ConnectionInfoPreparing {
us_as_src: src,
them_as_dst: dst,
their_info: None,
});
Some(token)
}
pub fn get_new_connection_info_token(&mut self, token: u32) -> Result<u32, Error> {
let pub_id = self.connection_token_map
.remove(&token)
.ok_or(Error::PeerNotFound)?;
let new_token = rand::random();
let _ = self.connection_token_map.insert(new_token, pub_id);
Ok(new_token)
}
pub fn peers_needing_tunnel(&self) -> Vec<(PeerId, XorName)> {
self.peer_map
.peers()
.filter_map(|peer| match peer.state {
PeerState::SearchingForTunnel => {
peer.peer_id.and_then(|peer_id| Some((peer_id, *peer.name())))
}
_ => None,
})
.collect()
}
pub fn allow_connect(&self, name: &XorName) -> Result<(), RoutingTableError> {
self.routing_table.need_to_add(name)
}
pub fn remove_peer(&mut self,
peer_id: &PeerId)
-> Option<(Peer, Result<RemovalDetails<XorName>, RoutingTableError>)> {
if let Some(peer) = self.peer_map.remove(peer_id) {
self.cleanup_proxy_peer_id();
let removal_details = self.routing_table.remove(peer.name());
Some((peer, removal_details))
} else {
None
}
}
pub fn get_state_by_name(&self, name: &XorName) -> Option<&PeerState> {
self.peer_map.get_by_name(name).map(Peer::state)
}
fn get_state(&self, peer_id: &PeerId) -> Option<&PeerState> {
self.peer_map.get(peer_id).map(Peer::state)
}
fn set_state(&mut self, peer_id: &PeerId, state: PeerState) -> bool {
if let Some(peer) = self.peer_map.get_mut(peer_id) {
peer.state = state;
return true;
}
trace!("{:?}: {:?} not found. Cannot set state {:?}.",
self,
peer_id,
state);
false
}
fn insert_peer(&mut self, pub_id: PublicId, peer_id: Option<PeerId>, state: PeerState) -> bool {
let result = self.peer_map.insert(Peer::new(pub_id, peer_id, state)).is_some();
self.remove_expired();
result
}
fn remove_expired(&mut self) {
let expired_names = self.peer_map
.peers()
.filter(|peer| peer.is_expired())
.map(|peer| *peer.name())
.collect_vec();
for name in expired_names {
let _ = self.peer_map.remove_by_name(&name);
}
self.cleanup_proxy_peer_id();
}
fn cleanup_proxy_peer_id(&mut self) {
if let Some(peer_id) = self.proxy_peer_id {
if self.peer_map.get(&peer_id).is_none() {
self.proxy_peer_id = None;
}
}
}
fn remove_unapproved_candidates(&mut self, candidate_name: &XorName) {
let old_candidates = mem::replace(&mut self.candidates, HashMap::new());
self.candidates = old_candidates.into_iter()
.filter(|&(name, ref candidate)| name == *candidate_name || candidate.is_approved())
.collect();
}
pub fn remove_expired_candidates(&mut self) -> Vec<PeerId> {
let candidates = mem::replace(&mut self.candidates, HashMap::new());
let (to_prune, to_keep) =
candidates.into_iter().partition(|&(_, ref candidate)| candidate.is_expired());
self.candidates = to_keep;
to_prune.into_iter().filter_map(|(name, _)| self.get_peer_id(&name).cloned()).collect()
}
pub fn pub_ids_by_section(&self) -> SectionMap {
self.routing_table
.all_sections()
.into_iter()
.map(|(prefix, names)| (prefix, self.get_pub_ids(&names)))
.collect()
}
}
impl fmt::Debug for PeerManager {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter,
"Node({}({:b}))",
self.routing_table.our_name(),
self.routing_table.our_prefix())
}
}
#[cfg(feature = "use-mock-crust")]
impl PeerManager {
pub fn remove_connecting_peers(&mut self) -> bool {
let remove_names = self.peer_map
.peers()
.filter(|peer| match peer.state {
PeerState::ConnectionInfoPreparing { .. } |
PeerState::ConnectionInfoReady(_) |
PeerState::CrustConnecting |
PeerState::SearchingForTunnel => true,
_ => false,
})
.map(|peer| *peer.name())
.collect_vec();
if remove_names.is_empty() && self.expected_peers.is_empty() && self.candidates.is_empty() {
return false;
}
for name in remove_names {
let _ = self.peer_map.remove_by_name(&name);
}
self.expected_peers.clear();
self.candidates.clear();
true
}
}
#[cfg(all(test, feature = "use-mock-crust"))]
mod tests {
use super::*;
use id::FullId;
use mock_crust::Endpoint;
use mock_crust::crust::{PeerId, PrivConnectionInfo, PubConnectionInfo};
use routing_table::Authority;
use types::MessageId;
use xor_name::{XOR_NAME_LEN, XorName};
fn node_auth(byte: u8) -> Authority<XorName> {
Authority::ManagedNode(XorName([byte; XOR_NAME_LEN]))
}
#[test]
pub fn connection_info_prepare_receive() {
let min_section_size = 8;
let orig_pub_id = *FullId::new().public_id();
let mut peer_mgr = PeerManager::new(min_section_size, orig_pub_id);
let our_connection_info = PrivConnectionInfo(PeerId(0), Endpoint(0));
let their_connection_info = PubConnectionInfo(PeerId(1), Endpoint(1));
let token = unwrap!(peer_mgr.get_connection_token(node_auth(0), node_auth(1), orig_pub_id));
match peer_mgr.connection_info_prepared(token, our_connection_info.clone()) {
Ok(ConnectionInfoPreparedResult { pub_id, src, dst, infos: None }) => {
assert_eq!(orig_pub_id, pub_id);
assert_eq!(node_auth(0), src);
assert_eq!(node_auth(1), dst);
}
result => panic!("Unexpected result: {:?}", result),
}
match peer_mgr.connection_info_received(node_auth(0),
node_auth(1),
orig_pub_id,
their_connection_info.clone(),
MessageId::new()) {
Ok(ConnectionInfoReceivedResult::Ready(our_info, their_info)) => {
assert_eq!(our_connection_info, our_info);
assert_eq!(their_connection_info, their_info);
}
result => panic!("Unexpected result: {:?}", result),
}
match peer_mgr.get_state_by_name(orig_pub_id.name()) {
Some(&PeerState::CrustConnecting) => (),
state => panic!("Unexpected state: {:?}", state),
}
}
#[test]
pub fn connection_info_receive_prepare() {
let min_section_size = 8;
let orig_pub_id = *FullId::new().public_id();
let mut peer_mgr = PeerManager::new(min_section_size, orig_pub_id);
let our_connection_info = PrivConnectionInfo(PeerId(0), Endpoint(0));
let their_connection_info = PubConnectionInfo(PeerId(1), Endpoint(1));
let original_msg_id = MessageId::new();
let token = match peer_mgr.connection_info_received(node_auth(0),
node_auth(1),
orig_pub_id,
their_connection_info.clone(),
original_msg_id) {
Ok(ConnectionInfoReceivedResult::Prepare(token)) => token,
result => panic!("Unexpected result: {:?}", result),
};
match peer_mgr.connection_info_prepared(token, our_connection_info.clone()) {
Ok(ConnectionInfoPreparedResult { pub_id,
src,
dst,
infos: Some((our_info, their_info, msg_id)) }) => {
assert_eq!(orig_pub_id, pub_id);
assert_eq!(node_auth(1), src);
assert_eq!(node_auth(0), dst);
assert_eq!(our_connection_info, our_info);
assert_eq!(their_connection_info, their_info);
assert_eq!(original_msg_id, msg_id);
}
result => panic!("Unexpected result: {:?}", result),
}
match peer_mgr.get_state_by_name(orig_pub_id.name()) {
Some(&PeerState::CrustConnecting) => (),
state => panic!("Unexpected state: {:?}", state),
}
}
}