use crust::{PeerId, PrivConnectionInfo, PubConnectionInfo};
use authority::Authority;
use sodiumoxide::crypto::sign;
use id::PublicId;
use itertools::Itertools;
use rand;
use std::collections::HashMap;
use std::{error, fmt, mem};
use std::time::{Duration, Instant};
use xor_name::XorName;
use kademlia_routing_table::{AddedNodeDetails, DroppedNodeDetails, RoutingTable};
pub const GROUP_SIZE: usize = 8;
pub const QUORUM_SIZE: usize = 5;
const JOINING_NODE_TIMEOUT_SECS: u64 = 300;
#[cfg(not(feature = "use-mock-crust"))]
const CONNECTION_TIMEOUT_SECS: u64 = 90;
#[cfg(feature = "use-mock-crust")]
const CONNECTION_TIMEOUT_SECS: u64 = 0;
const EXTRA_BUCKET_ENTRIES: usize = 2;
pub struct ClientInfo {
pub public_key: sign::PublicKey,
pub client_restriction: bool,
pub timestamp: Instant,
}
impl ClientInfo {
fn new(public_key: sign::PublicKey, client_restriction: bool) -> Self {
ClientInfo {
public_key: public_key,
client_restriction: client_restriction,
timestamp: Instant::now(),
}
}
fn is_stale(&self) -> bool {
!self.client_restriction &&
self.timestamp.elapsed() > Duration::from_secs(JOINING_NODE_TIMEOUT_SECS)
}
}
#[derive(Debug)]
pub enum Error {
PeerNotFound,
UnexpectedState,
}
impl fmt::Display for Error {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::PeerNotFound => write!(formatter, "Peer not found"),
Error::UnexpectedState => write!(formatter, "Peer state does not allow operation"),
}
}
}
impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::PeerNotFound => "Peer not found",
Error::UnexpectedState => "Peer state does not allow operation",
}
}
}
#[derive(Debug)]
pub enum PeerState {
ConnectionInfoPreparing(Authority, Authority, Option<PubConnectionInfo>),
ConnectionInfoReady(PrivConnectionInfo),
CrustConnecting,
SearchingForTunnel,
AwaitingNodeIdentify(bool),
Routing(bool),
}
#[derive(Debug)]
pub enum ConnectionInfoReceivedResult {
Ready(PrivConnectionInfo, PubConnectionInfo),
Prepare(u32),
Waiting,
IsProxy,
IsClient,
IsConnected,
}
#[derive(Debug)]
pub struct ConnectionInfoPreparedResult {
pub pub_id: PublicId,
pub src: Authority,
pub dst: Authority,
pub infos: Option<(PrivConnectionInfo, PubConnectionInfo)>,
}
pub struct PeerManager {
client_map: HashMap<PeerId, ClientInfo>,
connection_token_map: HashMap<u32, PublicId>,
node_map: HashMap<XorName, (Instant, PeerState)>,
proxy: Option<(Instant, PeerId, PublicId)>,
pub_id_map: HashMap<PeerId, PublicId>,
routing_table: RoutingTable<XorName>,
our_public_id: PublicId,
}
impl PeerManager {
pub fn new(our_public_id: PublicId) -> PeerManager {
PeerManager {
client_map: HashMap::new(),
connection_token_map: HashMap::new(),
node_map: HashMap::new(),
proxy: None,
pub_id_map: HashMap::new(),
routing_table: RoutingTable::<XorName>::new(*our_public_id.name(),
GROUP_SIZE,
EXTRA_BUCKET_ENTRIES),
our_public_id: our_public_id,
}
}
pub fn reset_routing_table(&mut self, our_public_id: PublicId) {
self.our_public_id = our_public_id;
let new_rt = RoutingTable::new(*our_public_id.name(), GROUP_SIZE, EXTRA_BUCKET_ENTRIES);
let old_rt = mem::replace(&mut self.routing_table, new_rt);
for name in old_rt.iter() {
let _ = self.node_map.remove(name);
}
self.cleanup_pub_id_map();
}
pub fn routing_table(&self) -> &RoutingTable<XorName> {
&self.routing_table
}
pub fn add_to_routing_table(&mut self,
public_id: PublicId,
peer_id: PeerId)
-> Option<AddedNodeDetails<XorName>> {
let result = self.routing_table.add(*public_id.name());
if result.is_some() {
let state = PeerState::Routing(match self.node_map.remove(public_id.name()) {
Some((_, PeerState::SearchingForTunnel)) |
Some((_, PeerState::AwaitingNodeIdentify(true))) => true,
Some((_, PeerState::Routing(tunnel))) => {
error!("Peer {:?} added to routing table, but already in state Routing.",
peer_id);
tunnel
}
_ => false,
});
let _ = self.node_map.insert(*public_id.name(), (Instant::now(), state));
let _ = self.pub_id_map.insert(peer_id, public_id);
}
result
}
pub fn remove_if_unneeded(&mut self, name: &XorName, peer_id: &PeerId) -> bool {
if self.get_proxy_public_id(peer_id).is_some() || self.get_client(peer_id).is_some() ||
Some(name) != self.pub_id_map.get(peer_id).map(PublicId::name) ||
!self.routing_table.remove_if_unneeded(name) {
return false;
}
let _ = self.pub_id_map.remove(peer_id);
let _ = self.node_map.remove(name);
true
}
pub fn can_tunnel_for(&self, peer_id: &PeerId, dst_id: &PeerId) -> bool {
let peer_state = self.pub_id_map.get(peer_id).and_then(|pub_id| self.get_state(pub_id));
let dst_state = self.pub_id_map.get(dst_id).and_then(|pub_id| self.get_state(pub_id));
match (peer_state, dst_state) {
(Some(&PeerState::Routing(false)), Some(&PeerState::Routing(false))) => true,
_ => false,
}
}
pub fn proxy(&self) -> &Option<(Instant, PeerId, PublicId)> {
&self.proxy
}
pub fn get_proxy_public_id(&self, peer_id: &PeerId) -> Option<&PublicId> {
match self.proxy {
Some((_, ref proxy_id, ref pub_id)) if proxy_id == peer_id => Some(pub_id),
_ => None,
}
}
pub fn set_proxy(&mut self, peer_id: PeerId, public_id: PublicId) -> bool {
if let Some((_, ref proxy_id, _)) = self.proxy {
debug!("Not accepting further bootstrap connections.");
*proxy_id == peer_id
} else {
self.proxy = Some((Instant::now(), peer_id, public_id));
true
}
}
pub fn insert_client(&mut self,
peer_id: PeerId,
public_id: &PublicId,
client_restriction: bool) {
let client_info = ClientInfo::new(*public_id.signing_public_key(), client_restriction);
let _ = self.client_map.insert(peer_id, client_info);
}
pub fn get_client(&self, peer_id: &PeerId) -> Option<&ClientInfo> {
self.client_map.get(peer_id)
}
pub fn remove_client(&mut self, peer_id: &PeerId) -> Option<ClientInfo> {
self.client_map.remove(peer_id)
}
pub fn remove_stale_joining_nodes(&mut self) -> Vec<PeerId> {
let mut stale_keys = self.client_map
.iter()
.filter(|&(_, info)| info.is_stale())
.map(|(&peer_id, _)| peer_id)
.collect_vec();
for peer_id in &stale_keys {
let _ = self.client_map.remove(peer_id);
}
if let Some((timestamp, peer_id, pub_id)) = self.proxy.take() {
if timestamp.elapsed() > Duration::from_secs(JOINING_NODE_TIMEOUT_SECS) {
stale_keys.push(peer_id);
} else {
self.proxy = Some((timestamp, peer_id, pub_id));
}
}
stale_keys
}
pub fn get_proxy_or_client_peer_id(&self, public_id: &PublicId) -> Option<PeerId> {
if let Some((&peer_id, _)) = self.client_map
.iter()
.find(|elt| &elt.1.public_key == public_id.signing_public_key()) {
return Some(peer_id);
}
match self.proxy {
Some((_, ref peer_id, ref proxy_pub_id)) if proxy_pub_id == public_id => Some(*peer_id),
_ => None,
}
}
pub fn joining_nodes_num(&self) -> usize {
self.client_map.len() - self.client_num()
}
pub fn client_num(&self) -> usize {
self.client_map.values().filter(|&info| info.client_restriction).count()
}
pub fn connected_to(&mut self, peer_id: PeerId) -> bool {
self.set_peer_state(peer_id, PeerState::AwaitingNodeIdentify(false))
}
pub fn tunnelling_to(&mut self, peer_id: PeerId) -> bool {
self.set_peer_state(peer_id, PeerState::AwaitingNodeIdentify(true))
}
pub fn get_connecting_peer(&mut self, peer_id: &PeerId) -> Option<&PublicId> {
self.pub_id_map.get(peer_id).and_then(|pub_id| {
match self.get_state(pub_id) {
Some(&PeerState::CrustConnecting) => Some(pub_id),
_ => None,
}
})
}
pub fn get_peer_ids(&self, names: &[XorName]) -> Vec<PeerId> {
self.pub_id_map
.iter()
.filter_map(|(peer_id, pub_id)| if names.contains(pub_id.name()) {
Some(*peer_id)
} else {
None
})
.collect()
}
pub fn get_pub_ids(&self, names: &[XorName]) -> Vec<PublicId> {
let mut result_map: HashMap<XorName, PublicId> = HashMap::new();
for pub_id in self.pub_id_map.values() {
if names.contains(pub_id.name()) {
let _ = result_map.insert(*pub_id.name(), *pub_id);
}
}
if names.contains(self.our_public_id.name()) {
let _ = result_map.insert(*self.our_public_id.name(), self.our_public_id);
}
names.iter()
.filter_map(|name| result_map.get(name))
.cloned()
.collect()
}
pub fn get_routing_peer(&self, peer_id: &PeerId) -> Option<&PublicId> {
self.pub_id_map.get(peer_id).and_then(|pub_id| {
if let Some(&PeerState::Routing(_)) = self.get_state(pub_id) {
Some(pub_id)
} else {
None
}
})
}
pub fn set_searching_for_tunnel(&mut self,
peer_id: PeerId,
pub_id: &PublicId)
-> Vec<(XorName, PeerId)> {
match self.get_state(pub_id) {
Some(&PeerState::Routing(_)) |
Some(&PeerState::AwaitingNodeIdentify(_)) => return vec![],
_ => (),
}
let _ = self.pub_id_map.insert(peer_id, *pub_id);
self.insert_state(*pub_id, PeerState::SearchingForTunnel);
let close_group = self.routing_table.closest_nodes_to(pub_id.name(), GROUP_SIZE, false);
self.pub_id_map
.iter()
.filter_map(|(peer_id, pub_id)| if close_group.contains(pub_id.name()) {
Some((*pub_id.name(), *peer_id))
} else {
None
})
.collect()
}
pub fn connection_info_prepared(&mut self,
token: u32,
our_info: PrivConnectionInfo)
-> Result<ConnectionInfoPreparedResult, Error> {
let pub_id = try!(self.connection_token_map.remove(&token).ok_or(Error::PeerNotFound));
let (src, dst, opt_their_info) = match self.node_map.remove(pub_id.name()) {
Some((_, PeerState::ConnectionInfoPreparing(src, dst, info))) => (src, dst, info),
Some((timestamp, state)) => {
let _ = self.node_map.insert(*pub_id.name(), (timestamp, state));
return Err(Error::UnexpectedState);
}
None => return Err(Error::PeerNotFound),
};
Ok(ConnectionInfoPreparedResult {
pub_id: pub_id,
src: src,
dst: dst,
infos: match opt_their_info {
Some(their_info) => {
self.insert_state(pub_id, PeerState::CrustConnecting);
let _ = self.pub_id_map.insert(their_info.id(), pub_id);
Some((our_info, their_info))
}
None => {
self.insert_state(pub_id, PeerState::ConnectionInfoReady(our_info));
None
}
},
})
}
pub fn connection_info_received(&mut self,
src: Authority,
dst: Authority,
pub_id: PublicId,
their_info: PubConnectionInfo)
-> Result<ConnectionInfoReceivedResult, Error> {
let peer_id = their_info.id();
if self.get_client(&peer_id).is_some() {
return Ok(ConnectionInfoReceivedResult::IsClient);
}
if self.get_proxy_public_id(&peer_id).is_some() {
return Ok(ConnectionInfoReceivedResult::IsProxy);
}
match self.node_map.remove(pub_id.name()) {
Some((_, PeerState::ConnectionInfoReady(our_info))) => {
self.insert_state(pub_id, PeerState::CrustConnecting);
let _ = self.pub_id_map.insert(peer_id, pub_id);
Ok(ConnectionInfoReceivedResult::Ready(our_info, their_info))
}
Some((_, PeerState::ConnectionInfoPreparing(src, dst, None))) => {
let state = PeerState::ConnectionInfoPreparing(src, dst, Some(their_info));
self.insert_state(pub_id, state);
Ok(ConnectionInfoReceivedResult::Waiting)
}
Some((_, PeerState::CrustConnecting)) => {
self.insert_state(pub_id, PeerState::CrustConnecting);
Ok(ConnectionInfoReceivedResult::Waiting)
}
Some((timestamp, PeerState::Routing(tunnel))) => {
let _ = self.node_map
.insert(*pub_id.name(), (timestamp, PeerState::Routing(tunnel)));
Ok(ConnectionInfoReceivedResult::IsConnected)
}
Some((timestamp, state)) => {
let _ = self.node_map.insert(*pub_id.name(), (timestamp, state));
Err(Error::UnexpectedState)
}
None => {
let state = PeerState::ConnectionInfoPreparing(src, dst, Some(their_info));
self.insert_state(pub_id, state);
let _ = self.pub_id_map.insert(peer_id, pub_id);
let token = rand::random();
let _ = self.connection_token_map.insert(token, pub_id);
Ok(ConnectionInfoReceivedResult::Prepare(token))
}
}
}
pub fn get_connection_token(&mut self,
src: Authority,
dst: Authority,
pub_id: PublicId)
-> Option<u32> {
match self.get_state(&pub_id) {
Some(&PeerState::AwaitingNodeIdentify(_)) |
Some(&PeerState::Routing(_)) |
Some(&PeerState::ConnectionInfoPreparing(..)) |
Some(&PeerState::ConnectionInfoReady(..)) |
Some(&PeerState::CrustConnecting) => return None,
_ => (),
}
let token = rand::random();
let _ = self.connection_token_map.insert(token, pub_id);
self.insert_state(pub_id, PeerState::ConnectionInfoPreparing(src, dst, None));
Some(token)
}
pub fn peers_needing_tunnel(&self) -> Vec<PeerId> {
self.pub_id_map
.iter()
.filter_map(|(peer_id, pub_id)| match self.get_state(pub_id) {
Some(&PeerState::SearchingForTunnel) => Some(*peer_id),
_ => None,
})
.collect()
}
pub fn allow_connect(&self, name: &XorName) -> bool {
!self.routing_table.contains(name) && self.routing_table.allow_connection(name)
}
pub fn remove_peer(&mut self, peer_id: &PeerId) -> Option<(XorName, DroppedNodeDetails)> {
if let Some(pub_id) = self.pub_id_map.remove(peer_id) {
let name = *pub_id.name();
let _ = self.node_map.remove(&name);
self.cleanup_pub_id_map();
self.routing_table.remove(&name).map(|result| (name, result))
} else {
None
}
}
#[cfg(feature = "use-mock-crust")]
pub fn clear_caches(&mut self) {
self.remove_expired();
}
fn set_peer_state(&mut self, peer_id: PeerId, state: PeerState) -> bool {
if let Some(&pub_id) = self.pub_id_map.get(&peer_id) {
self.insert_state(pub_id, state);
true
} else {
trace!("{:?} not found. Cannot set state {:?}.", peer_id, state);
false
}
}
fn get_state(&self, pub_id: &PublicId) -> Option<&PeerState> {
self.node_map.get(pub_id.name()).map(|&(_, ref state)| state)
}
#[cfg(feature = "use-mock-crust")]
fn insert_state(&mut self, pub_id: PublicId, state: PeerState) {
let _ = self.node_map.insert(*pub_id.name(), (Instant::now(), state));
}
#[cfg(not(feature = "use-mock-crust"))]
fn insert_state(&mut self, pub_id: PublicId, state: PeerState) {
let _ = self.node_map.insert(*pub_id.name(), (Instant::now(), state));
self.remove_expired();
}
#[cfg_attr(feature="clippy", allow(absurd_extreme_comparisons))]
fn remove_expired(&mut self) {
let remove_ids = self.node_map
.iter()
.filter(|&(_, &(ref timestamp, ref state))| match *state {
PeerState::ConnectionInfoPreparing(..) |
PeerState::ConnectionInfoReady(_) |
PeerState::CrustConnecting |
PeerState::SearchingForTunnel => {
timestamp.elapsed().as_secs() >= CONNECTION_TIMEOUT_SECS
}
PeerState::Routing(_) |
PeerState::AwaitingNodeIdentify(_) => false,
})
.map(|(pub_id, _)| *pub_id)
.collect_vec();
for pub_id in remove_ids {
let _ = self.node_map.remove(&pub_id);
}
let remove_tokens = self.connection_token_map
.iter()
.filter(|&(_, pub_id)| match self.get_state(pub_id) {
Some(&PeerState::ConnectionInfoPreparing(..)) => false,
_ => true,
})
.map(|(token, _)| *token)
.collect_vec();
for token in remove_tokens {
let _ = self.connection_token_map.remove(&token);
}
self.cleanup_pub_id_map();
}
fn cleanup_pub_id_map(&mut self) {
let remove_peer_ids = self.pub_id_map
.iter()
.filter(|&(_, pub_id)| !self.node_map.contains_key(pub_id.name()))
.map(|(peer_id, _)| *peer_id)
.collect_vec();
for peer_id in remove_peer_ids {
let _ = self.pub_id_map.remove(&peer_id);
}
}
}
#[cfg(all(test, feature = "use-mock-crust"))]
mod tests {
use super::*;
use authority::Authority;
use id::FullId;
use mock_crust::crust::{PeerId, PrivConnectionInfo, PubConnectionInfo};
use mock_crust::Endpoint;
use xor_name::{XOR_NAME_LEN, XorName};
fn node_auth(byte: u8) -> Authority {
Authority::ManagedNode(XorName([byte; XOR_NAME_LEN]))
}
#[test]
pub fn connection_info_prepare_receive() {
let orig_pub_id = *FullId::new().public_id();
let mut peer_mgr = PeerManager::new(orig_pub_id);
let our_connection_info = PrivConnectionInfo(PeerId(0), Endpoint(0));
let their_connection_info = PubConnectionInfo(PeerId(1), Endpoint(1));
let token = unwrap!(peer_mgr.get_connection_token(node_auth(0), node_auth(1), orig_pub_id));
match peer_mgr.connection_info_prepared(token, our_connection_info.clone()) {
Ok(ConnectionInfoPreparedResult { pub_id, src, dst, infos: None }) => {
assert_eq!(orig_pub_id, pub_id);
assert_eq!(node_auth(0), src);
assert_eq!(node_auth(1), dst);
}
result => panic!("Unexpected result: {:?}", result),
}
match peer_mgr.connection_info_received(node_auth(0),
node_auth(1),
orig_pub_id,
their_connection_info.clone()) {
Ok(ConnectionInfoReceivedResult::Ready(our_info, their_info)) => {
assert_eq!(our_connection_info, our_info);
assert_eq!(their_connection_info, their_info);
}
result => panic!("Unexpected result: {:?}", result),
}
match peer_mgr.get_state(&orig_pub_id) {
Some(&PeerState::CrustConnecting) => (),
state => panic!("Unexpected state: {:?}", state),
}
}
#[test]
pub fn connection_info_receive_prepare() {
let orig_pub_id = *FullId::new().public_id();
let mut peer_mgr = PeerManager::new(orig_pub_id);
let our_connection_info = PrivConnectionInfo(PeerId(0), Endpoint(0));
let their_connection_info = PubConnectionInfo(PeerId(1), Endpoint(1));
let token = match peer_mgr.connection_info_received(node_auth(0),
node_auth(1),
orig_pub_id,
their_connection_info.clone()) {
Ok(ConnectionInfoReceivedResult::Prepare(token)) => token,
result => panic!("Unexpected result: {:?}", result),
};
match peer_mgr.connection_info_prepared(token, our_connection_info.clone()) {
Ok(ConnectionInfoPreparedResult { pub_id,
src,
dst,
infos: Some((our_info, their_info)) }) => {
assert_eq!(orig_pub_id, pub_id);
assert_eq!(node_auth(0), src);
assert_eq!(node_auth(1), dst);
assert_eq!(our_connection_info, our_info);
assert_eq!(their_connection_info, their_info);
}
result => panic!("Unexpected result: {:?}", result),
}
match peer_mgr.get_state(&orig_pub_id) {
Some(&PeerState::CrustConnecting) => (),
state => panic!("Unexpected state: {:?}", state),
}
}
}