use cbor::{Decoder, Encoder, CborError};
use rand;
use rustc_serialize::{Decodable, Encodable};
use sodiumoxide;
use sodiumoxide::crypto::sign::verify_detached;
use std::collections::{BTreeMap, HashMap};
use std::sync::mpsc;
use std::boxed::Box;
use std::ops::DerefMut;
use std::sync::mpsc::Receiver;
use time::{Duration, SteadyTime};
use challenge::{ChallengeRequest, ChallengeResponse, validate};
use crust;
use lru_time_cache::LruCache;
use message_filter::MessageFilter;
use NameType;
use name_type::{closer_to_target_or_equal, NAME_TYPE_LEN};
use node_interface;
use node_interface::Interface;
use routing_table::{RoutingTable, NodeInfo};
use sendable::Sendable;
use types;
use types::{MessageId, NameAndTypeId, Signature, Bytes};
use authority::{Authority, our_authority};
use message_header::MessageHeader;
use messages::bootstrap_id_request::BootstrapIdRequest;
use messages::bootstrap_id_response::BootstrapIdResponse;
use messages::get_data::GetData;
use messages::get_data_response::GetDataResponse;
use messages::put_data::PutData;
use messages::put_data_response::PutDataResponse;
use messages::connect_request::ConnectRequest;
use messages::connect_response::ConnectResponse;
use messages::connect_success::ConnectSuccess;
use messages::find_group::FindGroup;
use messages::find_group_response::FindGroupResponse;
use messages::get_group_key::GetGroupKey;
use messages::get_group_key_response::GetGroupKeyResponse;
use messages::post::Post;
use messages::get_client_key::GetKey;
use messages::get_client_key_response::GetKeyResponse;
use messages::put_public_id::PutPublicId;
use messages::put_public_id_response::PutPublicIdResponse;
use messages::{RoutingMessage, MessageTypeTag};
use types::{MessageAction};
use error::{RoutingError, InterfaceError, ResponseError};
use std::convert::From;
type ConnectionManager = crust::ConnectionManager;
type Event = crust::Event;
pub type Endpoint = crust::Endpoint;
type PortAndProtocol = crust::Port;
type RoutingResult = Result<(), RoutingError>;
pub struct RoutingNode<F: Interface> {
interface: Box<F>,
id: types::Id,
own_name: NameType,
event_input: Receiver<Event>,
connection_manager: ConnectionManager,
all_connections: (HashMap<Endpoint, NameType>, BTreeMap<NameType, Vec<Endpoint>>),
routing_table: RoutingTable,
accepting_on: Vec<Endpoint>,
next_message_id: MessageId,
bootstrap_endpoint: Option<Endpoint>,
bootstrap_node_id: Option<NameType>,
filter: MessageFilter<types::FilterType>,
public_id_cache: LruCache<NameType, types::PublicId>,
connection_cache: BTreeMap<NameType, SteadyTime>
}
impl<F> RoutingNode<F> where F: Interface {
pub fn new(my_interface: F) -> RoutingNode<F> {
sodiumoxide::init(); let (event_output, event_input) = mpsc::channel();
let id = types::Id::new();
let own_name = id.get_name();
let mut cm = crust::ConnectionManager::new(event_output);
let ports_and_protocols : Vec<PortAndProtocol> = Vec::new();
let beacon_port = Some(5483u16);
let listeners = match cm.start_listening2(ports_and_protocols, beacon_port) {
Err(reason) => {
println!("Failed to start listening: {:?}", reason);
(vec![], None)
}
Ok(listeners_and_beacon) => listeners_and_beacon
};
println!("{:?} -- listening on : {:?}", own_name, listeners.0);
RoutingNode { interface: Box::new(my_interface),
id : id,
own_name : own_name.clone(),
event_input: event_input,
connection_manager: cm,
all_connections: (HashMap::new(), BTreeMap::new()),
routing_table : RoutingTable::new(&own_name),
accepting_on: listeners.0,
next_message_id: rand::random::<MessageId>(),
bootstrap_endpoint: None,
bootstrap_node_id: None,
filter: MessageFilter::with_expiry_duration(Duration::minutes(20)),
public_id_cache: LruCache::with_expiry_duration(Duration::minutes(10)),
connection_cache: BTreeMap::new(),
}
}
pub fn get(&mut self, type_id: u64, name: NameType) {
let destination = types::DestinationAddress{ dest: name.clone(), relay_to: None };
let header = MessageHeader::new(self.get_next_message_id(),
destination, self.our_source_address(),
Authority::Client);
let request = GetData{ requester: self.our_source_address(),
name_and_type_id: NameAndTypeId{name: name.clone(),
type_id: type_id} };
let message = RoutingMessage::new(MessageTypeTag::GetData, header,
request, &self.id.get_crypto_secret_sign_key());
ignore(encode(&message).map(|msg| self.send_swarm_or_parallel(&name, &msg)));
}
pub fn put(&mut self, destination: NameType, content: Box<Sendable>) {
let destination = types::DestinationAddress{ dest: destination, relay_to: None };
let request = PutData{ name: content.name(), data: content.serialised_contents() };
let header = MessageHeader::new(self.get_next_message_id(),
destination, self.our_source_address(),
Authority::ManagedNode);
let message = RoutingMessage::new(MessageTypeTag::PutData, header,
request, &self.id.get_crypto_secret_sign_key());
let _ = encode(&message).map(|msg| self.send_swarm_or_parallel(&self.id(), &msg));
}
pub fn unauthorised_put(&mut self, destination: NameType, content: Box<Sendable>) {
let destination = types::DestinationAddress{ dest: destination, relay_to: None };
let request = PutData{ name: content.name(), data: content.serialised_contents() };
let header = MessageHeader::new(self.get_next_message_id(), destination,
self.our_source_address(), Authority::Unknown);
let message = RoutingMessage::new(MessageTypeTag::UnauthorisedPut, header,
request, &self.id.get_crypto_secret_sign_key());
let _ = encode(&message).map(|msg| self.send_swarm_or_parallel(&self.id(), &msg));
}
pub fn refresh(&mut self, content: Box<Sendable>) {
self.put(content.name(), content);
}
pub fn post(&self, destination: NameType, content: Vec<u8>) { unimplemented!() }
pub fn bootstrap(&mut self, bootstrap_list: Option<Vec<Endpoint>>,
beacon_port: Option<u16>) -> Result<(), RoutingError> {
let bootstrapped_to = try!(self.connection_manager.bootstrap(bootstrap_list, beacon_port)
.map_err(|_|RoutingError::FailedToBootstrap));
println!("bootstrap {:?}", bootstrapped_to);
self.bootstrap_endpoint = Some(bootstrapped_to);
self.send_bootstrap_id_request()
}
pub fn run(&mut self) {
let event = self.event_input.try_recv();
if event.is_err() {
return;
}
match event.unwrap() {
crust::Event::NewMessage(endpoint, bytes) => {
match self.endpoint_to_name(&endpoint).map(|n|n.clone()) {
Some(name) => {
ignore(self.message_received(&name, bytes));
},
None => {
if self.handle_challenge_request(&endpoint, &bytes) {
return;
}
if self.handle_challenge_response(&endpoint, &bytes) {
return;
}
ignore(self.handle_bootstrap_message(endpoint, bytes));
}
}
},
crust::Event::NewConnection(endpoint) => {
self.handle_new_connect_event(endpoint);
},
crust::Event::LostConnection(endpoint) => {
self.handle_lost_connection_event(endpoint);
}
}
}
fn handle_challenge_request(&mut self, peer_endpoint: &Endpoint,
serialised_message: &Bytes) -> bool {
let message = match decode::<ChallengeRequest>(serialised_message) {
Err(err) => return false,
Ok(message) => message,
};
let signature = sodiumoxide::crypto::sign::sign(serialised_message,
&self.id.get_crypto_secret_sign_key());
let response = ChallengeResponse{ name: self.own_name.clone(), signature: signature,
request: message };
let _ = encode(&response).map(
|serialised_message| self.connection_manager.send(peer_endpoint.clone(),
serialised_message));
true
}
fn handle_challenge_response(&mut self, peer_endpoint: &Endpoint,
serialised_message: &Bytes) -> bool {
let message = match decode::<ChallengeResponse>(serialised_message) {
Err(err) => return false,
Ok(message) => message,
};
if let Some(peer_public_id) = self.routing_table.public_id(&message.name) {
if !validate(&peer_public_id.public_sign_key.get_crypto_public_sign_key(), &message) {
return true;
}
self.all_connections.0.insert(peer_endpoint.clone(), peer_public_id.name());
let found = if let Some(peer_endpoints) =
self.all_connections.1.get_mut(&peer_public_id.name()) {
assert!(!peer_endpoints.is_empty());
peer_endpoints.push(peer_endpoint.clone());
true
} else {
false
};
if !found {
self.all_connections.1.insert(peer_public_id.name(), vec![peer_endpoint.clone()]);
}
true
} else {
false
}
}
fn generate_bootstrap_header(&self, message_id: MessageId) -> MessageHeader {
MessageHeader::new( message_id,
types::DestinationAddress{ dest: NameType::new([0u8; NAME_TYPE_LEN]), relay_to: None },
types::SourceAddress{ from_node: self.id(), from_group: None, reply_to: None, relayed_for: None },
Authority::ManagedNode)
}
fn send_bootstrap_id_request(&mut self) -> RoutingResult {
let message_id = self.get_next_message_id();
let message = RoutingMessage::new(MessageTypeTag::BootstrapIdRequest,
self.generate_bootstrap_header(message_id),
BootstrapIdRequest { sender_id: self.id() },
&self.id.get_crypto_secret_sign_key());
self.send_to_bootstrap_node(&try!(encode(&message)));
Ok(())
}
fn send_bootstrap_id_response(&mut self, peer_endpoint: Endpoint) {
let message_id = self.get_next_message_id();
let message = RoutingMessage::new(MessageTypeTag::BootstrapIdResponse,
self.generate_bootstrap_header(message_id),
BootstrapIdResponse { sender_id: self.id() },
&self.id.get_crypto_secret_sign_key());
ignore(encode(&message).map(|msg| self.send(Some(peer_endpoint).iter(), &msg)));
}
fn handle_bootstrap_id_response(&mut self, peer_endpoint: Endpoint, bytes: Bytes, is_client: bool) {
if self.all_connections.0.contains_key(&peer_endpoint) {
return;
}
let bootstrap_id_response_msg = decode::<BootstrapIdResponse>(&bytes);
if bootstrap_id_response_msg.is_err() { return;
}
let bootstrap_id_response_msg = bootstrap_id_response_msg.unwrap();
assert!(self.bootstrap_node_id.is_none());
assert_eq!(self.bootstrap_endpoint, Some(peer_endpoint.clone()));
self.bootstrap_node_id = Some(bootstrap_id_response_msg.sender_id.clone());
self.all_connections.0.insert(peer_endpoint.clone(),
bootstrap_id_response_msg.sender_id.clone());
self.all_connections.1.insert(bootstrap_id_response_msg.sender_id, vec![peer_endpoint]);
let find_group_msg = self.construct_find_group_msg();
ignore(encode(&find_group_msg).map(|msg|self.send_to_bootstrap_node(&msg)));
}
fn put_own_public_id(&mut self) {
let our_public_id: types::PublicId = types::PublicId::new(&self.id);
let message_id = self.get_next_message_id();
let destination = types::DestinationAddress{ dest: our_public_id.name(), relay_to: None };
let source = types::SourceAddress{ from_node: self.id(), from_group: None,
reply_to: self.bootstrap_node_id.clone(), relayed_for: None };
let authority = Authority::ManagedNode;
let request = PutPublicId{ public_id: our_public_id };
let header = MessageHeader::new(message_id, destination, source, authority);
let message = RoutingMessage::new(MessageTypeTag::PutPublicId, header,
request, &self.id.get_crypto_secret_sign_key());
ignore(encode(&message).map(|msg|self.send_to_bootstrap_node(&msg)));
}
fn handle_new_connect_event(&mut self, peer_endpoint: Endpoint) {
println!(" handle_new_connect_event peer_ep : {:?}", peer_endpoint);
match self.routing_table.mark_as_connected(&peer_endpoint) {
Some(peer_id) => {
println!("RT (size : {:?}) Marked connected peer_id : {:?} , peer_ep : {:?}",
self.routing_table.size(), peer_id, peer_endpoint);
self.all_connections.0.insert(peer_endpoint.clone(), peer_id.clone());
let found = if let Some(peer_endpoints) = self.all_connections.1.get_mut(&peer_id) {
assert!(!peer_endpoints.is_empty());
peer_endpoints.push(peer_endpoint.clone());
true
} else {
false
};
if !found {
self.all_connections.1.insert(peer_id, vec![peer_endpoint]);
}
},
None => {
let challenge_request = ChallengeRequest{ name: self.own_name.clone() };
let _ = encode(&challenge_request).map(
|serialised_message| self.connection_manager.send(peer_endpoint,
serialised_message));
},
}
}
fn handle_lost_connection_event(&mut self, peer_endpoint: Endpoint) {
println!(" handle_lost_connection_event peer_ep : {:?}", peer_endpoint);
let removed_entry = self.all_connections.0.remove(&peer_endpoint);
if removed_entry.is_some() {
let peer_id = removed_entry.unwrap();
self.routing_table.drop_node(&peer_id);
println!("RT (size : {:?})", self.routing_table.size());
match self.all_connections.1.get(&peer_id) {
Some(peer_endpoints) => {
for endpoint in peer_endpoints {
let _ = self.all_connections.0.remove(&endpoint);
self.connection_manager.drop_node(endpoint.clone());
}
}
None => (),
}
self.all_connections.1.remove(&peer_id);
}
}
fn on_churn(&mut self, close_group: Vec<NameType>) {
let actions = self.interface.handle_churn(close_group);
self.invoke_routing_actions(actions);
}
fn invoke_routing_actions(&mut self, routing_actions: Vec<node_interface::MethodCall>) {
for routing_action in routing_actions {
match routing_action {
node_interface::MethodCall::Put { destination: x, content: y, } => self.put(x, y),
node_interface::MethodCall::Get { type_id: x, name: y, } => self.get(x, y),
node_interface::MethodCall::Refresh { content: x, } => self.refresh(x),
node_interface::MethodCall::Post => unimplemented!(),
node_interface::MethodCall::None => (),
node_interface::MethodCall::SendOn { destination: _ } => unimplemented!(),
}
}
}
fn message_received(&mut self, peer_id: &NameType, serialised_msg: Bytes) -> RoutingResult {
let message = try!(decode::<RoutingMessage>(&serialised_msg));
let header = message.message_header;
let body = message.serialised_body;
if self.filter.check(&header.get_filter()) {
return Err(RoutingError::FilterCheckFailed);
}
self.filter.add(header.get_filter());
self.refresh_routing_table(&header.source.from_node);
if message.message_type == MessageTypeTag::GetDataResponse {
let get_data_response = try!(decode::<GetDataResponse>(&body));
let _ = get_data_response.data.map(|data| {
if data.len() != 0 {
let _ = self.mut_interface().handle_cache_put(
header.from_authority(), header.from(), data);
}
});
}
if message.message_type == MessageTypeTag::GetData {
let get_data = try!(decode::<GetData>(&body));
let retrieved_data = self.mut_interface().handle_cache_get(
get_data.name_and_type_id.type_id.clone() as u64,
get_data.name_and_type_id.name.clone(),
header.from_authority(),
header.from());
match retrieved_data {
Ok(action) => match action {
MessageAction::Reply(data) => {
let reply = self.construct_get_data_response_msg(&header, &get_data, data);
return encode(&reply).map(|reply| {
self.send_swarm_or_parallel(&header.send_to().dest, &reply);
}).map_err(From::from);
},
_ => (),
},
Err(_) => (),
};
}
self.send_swarm_or_parallel(&header.destination.dest, &serialised_msg);
if header.destination.dest == self.own_name {
self.send_by_name(header.destination.relay_to.iter(), serialised_msg);
}
if !self.address_in_close_group_range(&header.destination.dest) {
println!("{:?} not for us ", self.own_name);
return Ok(());
}
if message.message_type == MessageTypeTag::ConnectRequest || message.message_type == MessageTypeTag::ConnectResponse {
if header.destination.dest != self.own_name &&
(header.destination.relay_to.is_none() ||
header.destination.relay_to != Some(self.own_name.clone())) { return Ok(());
}
}
match message.message_type {
MessageTypeTag::UnauthorisedPut => self.handle_put_data(header, body),
MessageTypeTag::GetKey => self.handle_get_key(header, body),
MessageTypeTag::GetGroupKey => self.handle_get_group_key(header, body),
_ => {
match message.message_type {
MessageTypeTag::ConnectRequest => self.handle_connect_request(header, body, message.signature),
MessageTypeTag::ConnectResponse => self.handle_connect_response(body),
MessageTypeTag::FindGroup => self.handle_find_group(header, body),
MessageTypeTag::FindGroupResponse => self.handle_find_group_response(header, body),
MessageTypeTag::GetData => self.handle_get_data(header, body),
MessageTypeTag::GetDataResponse => self.handle_get_data_response(header, body),
MessageTypeTag::Post => self.handle_post(header, body),
MessageTypeTag::PostResponse => self.handle_post_response(header, body),
MessageTypeTag::PutData => self.handle_put_data(header, body),
MessageTypeTag::PutDataResponse => self.handle_put_data_response(header, body),
MessageTypeTag::PutPublicId => self.handle_put_public_id(header, body),
_ => {
println!("unhandled message from {:?}", peer_id);
Err(RoutingError::UnknownMessageType)
}
}
}
}
}
fn refresh_routing_table(&mut self, from_node : &NameType) {
if self.routing_table.check_node(from_node) {
let mut next_connect_request : Option<NameType> = None;
let time_now = SteadyTime::now();
self.connection_cache.entry(from_node.clone())
.or_insert(time_now);
for (new_node, time) in self.connection_cache.iter() {
if time_now - *time > Duration::seconds(5) {
next_connect_request = Some(new_node.clone());
break;
}
}
match next_connect_request {
Some(connect_to_node) => {
self.connection_cache.remove(&connect_to_node);
if self.routing_table.check_node(&connect_to_node) {
ignore(self.send_connect_request_msg(&connect_to_node));
}
},
None => ()
}
}
}
fn handle_bootstrap_message(&mut self, peer_endpoint: Endpoint, serialised_msg: Bytes) -> RoutingResult {
let message = try!(decode::<RoutingMessage>(&serialised_msg));
if message.message_type == MessageTypeTag::BootstrapIdRequest {
let request = try!(decode::<BootstrapIdRequest>(&message.serialised_body));
if self.bootstrap_node_id.is_none() {
self.bootstrap_node_id = Some(request.sender_id.clone());
self.bootstrap_endpoint = Some(peer_endpoint.clone());
}
self.all_connections.0.insert(peer_endpoint.clone(), request.sender_id.clone());
self.all_connections.1.insert(request.sender_id, vec![peer_endpoint.clone()]);
self.send_bootstrap_id_response(peer_endpoint);
} else if message.message_type == MessageTypeTag::BootstrapIdResponse {
self.handle_bootstrap_id_response(peer_endpoint, message.serialised_body,
message.message_header.authority == Authority::Client);
}
Ok(())
}
fn handle_get_group_key(&mut self, original_header : MessageHeader, body : Bytes) -> RoutingResult {
let get_group_key = try!(decode::<GetGroupKey>(&body));
let group_keys = self.routing_table.our_close_group()
.into_iter()
.map(|node| (node.fob.name(), node.fob.public_sign_key))
.chain(Some((self.id.get_name(), self.id.get_public_sign_key())).into_iter())
.collect::<Vec<_>>();
let routing_msg = self.construct_get_group_key_response_msg(&original_header,
&get_group_key,
group_keys);
let encoded_msg = try!(encode(&routing_msg));
let original_group = original_header.from_group();
original_group.map(|group| self.send_swarm_or_parallel(&group, &encoded_msg));
Ok(())
}
fn handle_connect_request(&mut self, original_header: MessageHeader, body: Bytes, signature: Signature) -> RoutingResult {
println!("{:?} received ConnectRequest ", self.own_name);
let connect_request = try!(decode::<ConnectRequest>(&body));
let mut peer_endpoints = connect_request.local_endpoints.clone();
peer_endpoints.extend(connect_request.external_endpoints.clone().into_iter());
let peer_node_info =
NodeInfo::new(connect_request.requester_fob.clone(), peer_endpoints, None);
let (added, _) = self.routing_table.add_node(peer_node_info.clone());
if !added {
return Err(RoutingError::AlreadyConnected); }
println!("RT (size : {:?}) added {:?} ", self.routing_table.size(), peer_node_info.fob.name());
self.connection_manager.connect(connect_request.local_endpoints.clone());
self.connection_manager.connect(connect_request.external_endpoints.clone());
let routing_msg = self.construct_connect_response_msg(&original_header, &body, &signature, &connect_request);
let serialised_message = try!(encode(&routing_msg));
self.send_swarm_or_parallel(&connect_request.requester_id, &serialised_message);
self.send_to_bootstrap_node(&serialised_message);
self.send_by_name(original_header.source.reply_to.iter(), serialised_message);
Ok(())
}
fn handle_connect_response(&mut self, body: Bytes) -> RoutingResult {
println!("{:?} received ConnectResponse", self.own_name);
let connect_response = try!(decode::<ConnectResponse>(&body));
let connect_request = try!(decode::<ConnectRequest>(&connect_response.serialised_connect_request));
if connect_request.requester_id != self.id.get_name() ||
!verify_detached(&connect_response.connect_request_signature.get_crypto_signature(),
&connect_response.serialised_connect_request[..],
&self.id.get_crypto_public_sign_key()) {
return Err(RoutingError::Response(ResponseError::InvalidRequest));
}
let mut peer_endpoints = connect_response.receiver_local_endpoints.clone();
peer_endpoints.extend(connect_response.receiver_external_endpoints.clone().into_iter());
let peer_node_info =
NodeInfo::new(connect_response.receiver_fob.clone(), peer_endpoints, None);
let (added, _) = self.routing_table.add_node(peer_node_info.clone());
if !added {
return Ok(());
}
println!("RT (size : {:?}) added {:?}", self.routing_table.size(), peer_node_info.fob.name());
self.connection_manager.connect(connect_response.receiver_local_endpoints.clone());
self.connection_manager.connect(connect_response.receiver_external_endpoints.clone());
Ok(())
}
fn handle_find_group(&mut self, original_header: MessageHeader, body: Bytes) -> RoutingResult {
println!("{:?} received FindGroup {:?}", self.own_name, original_header.message_id);
let find_group = try!(decode::<FindGroup>(&body));
let group = self.routing_table.our_close_group().into_iter()
.map(|x|x.fob)
.chain(Some(types::PublicId::new(&self.id)).into_iter())
.collect::<Vec<_>>();
let routing_msg = self.construct_find_group_response_msg(&original_header, &find_group, group);
let serialised_msg = try!(encode(&routing_msg));
self.send_swarm_or_parallel(&original_header.send_to().dest, &serialised_msg);
self.send_by_name(original_header.source.relayed_for.iter(), serialised_msg);
Ok(())
}
fn handle_find_group_response(&mut self, original_header: MessageHeader, body: Bytes) -> RoutingResult {
println!("{:?} received FindGroupResponse", self.own_name);
let find_group_response = try!(decode::<FindGroupResponse>(&body));
for peer in find_group_response.group {
if self.routing_table.check_node(&peer.name()) {
ignore(self.send_connect_request_msg(&peer.name()));
}
}
Ok(())
}
fn send_connect_request_msg(&mut self, peer_id: &NameType) -> RoutingResult {
let routing_msg = self.construct_connect_request_msg(&peer_id);
let serialised_message = try!(encode(&routing_msg));
self.send_swarm_or_parallel(peer_id, &serialised_message);
self.send_to_bootstrap_node(&serialised_message);
Ok(())
}
fn handle_get_data(&mut self, header: MessageHeader, body: Bytes) -> RoutingResult {
let get_data = try!(decode::<GetData>(&body));
let type_id = get_data.name_and_type_id.type_id.clone();
let name = get_data.name_and_type_id.name.clone();
let our_authority = our_authority(&name, &header, &self.routing_table);
let from_authority = header.from_authority();
let from = header.from();
match self.mut_interface().handle_get(type_id, name, our_authority.clone(), from_authority, from) {
Ok(action) => match action {
MessageAction::Reply(data) => {
let routing_msg = RoutingMessage::new(MessageTypeTag::GetDataResponse, header.create_reply(&self.own_name, &our_authority),
GetDataResponse{ name_and_type_id :get_data.name_and_type_id, data: Ok(data) },
&self.id.get_crypto_secret_sign_key());
let encoded_msg = try!(encode(&routing_msg));
self.send_swarm_or_parallel(&header.send_to().dest, &encoded_msg);
},
MessageAction::SendOn(dest_nodes) => {
for dest_node in dest_nodes {
let send_on_header = header.create_send_on(&self.own_name, &our_authority, &dest_node);
let routing_msg = RoutingMessage::new(MessageTypeTag::GetData, send_on_header,
get_data.clone(), &self.id.get_crypto_secret_sign_key());
let encoded_msg = try!(encode(&routing_msg));
self.send_swarm_or_parallel(&dest_node, &encoded_msg);
}
}
},
Err(InterfaceError::Abort) => {;},
Err(InterfaceError::Response(error)) => {
let routing_msg = RoutingMessage::new(MessageTypeTag::GetDataResponse, header.create_reply(&self.own_name, &our_authority),
GetDataResponse{ name_and_type_id :get_data.name_and_type_id, data: Err(error) },
&self.id.get_crypto_secret_sign_key());
let encoded_msg = try!(encode(&routing_msg));
self.send_swarm_or_parallel(&header.send_to().dest, &encoded_msg);
}
}
Ok(())
}
fn handle_get_key(&mut self, header: MessageHeader, body: Bytes) -> RoutingResult {
let get_key = try!(decode::<GetKey>(&body));
let type_id = 106u64;
let our_authority = our_authority(&get_key.target_id, &header, &self.routing_table);
let from_authority = header.from_authority();
let from = header.from();
let name = get_key.target_id.clone();
let action = try!(self.mut_interface().handle_get_key(type_id, name, our_authority.clone(), from_authority, from));
match action {
MessageAction::Reply(data) => {
let public_key = try!(decode::<types::PublicSignKey>(&data));
let routing_msg = RoutingMessage::new(MessageTypeTag::GetKeyResponse, header.create_reply(&self.own_name, &our_authority),
GetKeyResponse{ address : get_key.target_id.clone(), public_sign_key : public_key },
&self.id.get_crypto_secret_sign_key());
let encoded_msg = try!(encode(&routing_msg));
self.send_swarm_or_parallel(&header.send_to().dest, &encoded_msg);
},
MessageAction::SendOn(dest_nodes) => {
for dest_node in dest_nodes {
let send_on_header = header.create_send_on(&self.own_name, &our_authority, &dest_node);
let routing_msg = RoutingMessage::new(MessageTypeTag::GetKey, send_on_header,
get_key.clone(), &self.id.get_crypto_secret_sign_key());
let encoded_msg = try!(encode(&routing_msg));
self.send_swarm_or_parallel(&dest_node, &encoded_msg);
}
}
}
Ok(())
}
fn handle_get_data_response(&mut self, header: MessageHeader, body: Bytes) -> RoutingResult {
let get_data_response = try!(decode::<GetDataResponse>(&body));
let from = header.from();
self.mut_interface().handle_get_response(from, get_data_response.data);
Ok(())
}
fn handle_post(&mut self, header : MessageHeader, body : Bytes) -> RoutingResult {
let post = try!(decode::<Post>(&body));
let our_authority = our_authority(&post.name, &header, &self.routing_table);
match try!(self.mut_interface().handle_post(our_authority.clone(),
header.authority.clone(),
header.from(),
post.name.clone(),
post.data.clone())) {
MessageAction::Reply(data) => {
Ok(()) },
MessageAction::SendOn(destinations) => {
for destination in destinations {
let send_on_header = header.create_send_on(&self.own_name,
&our_authority, &destination);
let routing_msg = RoutingMessage::new(MessageTypeTag::Post,
send_on_header, post.clone(), &self.id.get_crypto_secret_sign_key());
self.send_swarm_or_parallel(&destination, &try!(encode(&routing_msg)));
}
Ok(())
},
}
}
fn handle_post_response(&self, header : MessageHeader, body : Bytes) -> RoutingResult {
Ok(())
}
fn handle_put_public_id(&mut self, header: MessageHeader, body: Bytes) -> RoutingResult {
let put_public_id = try!(decode::<PutPublicId>(&body));
let our_authority = our_authority(&put_public_id.public_id.name(), &header, &self.routing_table);
match (header.from_authority(), our_authority.clone(), put_public_id.public_id.is_relocated()) {
(Authority::ManagedNode, Authority::NaeManager, false) => {
let mut put_public_id_relocated = put_public_id.clone();
let mut close_group_node_ids : Vec<NameType> = Vec::new();
for node_info in self.routing_table.our_close_group() {
close_group_node_ids.push(node_info.id());
}
let relocated_name = try!(types::calculate_relocated_name(
close_group_node_ids,
&put_public_id.public_id.name()));
put_public_id_relocated.public_id.assign_relocated_name(relocated_name.clone());
let send_on_header = header.create_send_on(&self.own_name, &our_authority, &relocated_name);
let routing_msg = RoutingMessage::new(MessageTypeTag::PutPublicId,
send_on_header, put_public_id_relocated,
&self.id.get_crypto_secret_sign_key());
self.send_swarm_or_parallel(&relocated_name, &try!(encode(&routing_msg)));
Ok(())
},
(Authority::NaeManager, Authority::NaeManager, true) => {
if !self.public_id_cache.check(&put_public_id.public_id.name()) {
self.public_id_cache.add(put_public_id.public_id.name(), put_public_id.public_id.clone());
let routing_msg = RoutingMessage::new(MessageTypeTag::PutPublicIdResponse,
header.create_reply(&self.own_name, &our_authority),
PutPublicIdResponse{ public_id :put_public_id.public_id.clone() },
&self.id.get_crypto_secret_sign_key());
let encoded_msg = try!(encode(&routing_msg));
self.send_swarm_or_parallel(&put_public_id.public_id.name(), &encoded_msg);
}
Ok(())
},
_ => {
Err(RoutingError::BadAuthority)
}
}
}
fn handle_put_public_id_reponse(&mut self, header: MessageHeader, body: Bytes) -> RoutingResult {
let put_public_id_response = try!(decode::<PutPublicIdResponse>(&body));
Ok(())
}
fn handle_put_data(&mut self, header: MessageHeader, body: Bytes) -> RoutingResult {
let put_data = try!(decode::<PutData>(&body));
let our_authority = our_authority(&put_data.name, &header, &self.routing_table);
let from_authority = header.from_authority();
let from = header.from();
let to = header.send_to();
match try!(self.mut_interface().handle_put(our_authority.clone(), from_authority, from,
to, put_data.data.clone())) {
MessageAction::Reply(reply_data) => {
let reply_header = header.create_reply(&self.own_name, &our_authority);
let reply_to = match our_authority {
Authority::ClientManager => match header.reply_to() {
Some(client) => client,
None => header.from()
},
_ => header.from()
};
let put_data_response = PutDataResponse {
name : put_data.name.clone(),
data : Ok(reply_data),
};
let routing_msg = RoutingMessage::new(MessageTypeTag::PutDataResponse,
reply_header, put_data_response, &self.id.get_crypto_secret_sign_key());
self.send_swarm_or_parallel(&reply_to, &try!(encode(&routing_msg)));
Ok(())
},
MessageAction::SendOn(destinations) => {
for destination in destinations {
let send_on_header = header.create_send_on(&self.own_name,
&our_authority, &destination);
let routing_msg = RoutingMessage::new(MessageTypeTag::PutData,
send_on_header, put_data.clone(), &self.id.get_crypto_secret_sign_key());
self.send_swarm_or_parallel(&destination, &try!(encode(&routing_msg)));
}
Ok(())
},
}
}
fn handle_put_data_response(&mut self, header: MessageHeader, body: Bytes) -> RoutingResult {
let put_data_response = try!(decode::<PutDataResponse>(&body));
let from_authority = header.from_authority();
let from = header.from();
self.mut_interface().handle_put_response(from_authority, from, put_data_response.data);
Ok(())
}
fn our_source_address(&self) -> types::SourceAddress {
if self.bootstrap_endpoint.is_some() {
let id = self.all_connections.0.get(&self.bootstrap_endpoint.clone().unwrap());
if id.is_some() {
return types::SourceAddress{ from_node: id.unwrap().clone(),
from_group: None,
reply_to: None,
relayed_for: Some(self.own_name.clone()) }
}
}
return types::SourceAddress{ from_node: self.own_name.clone(), from_group: None, reply_to: None, relayed_for: None }
}
fn group_address_for_group(&self, group_address : &types::GroupAddress) -> types::SourceAddress {
types::SourceAddress {
from_node : self.own_name.clone(),
from_group : Some(group_address.clone()),
reply_to : None,
relayed_for: None
}
}
fn our_group_address(&self, group_id: NameType) -> types::SourceAddress {
types::SourceAddress{ from_node: self.own_name.clone(), from_group: Some(group_id.clone()),
reply_to: None, relayed_for: None }
}
fn construct_get_group_key_response_msg(&mut self, original_header : &MessageHeader,
get_group_key : &GetGroupKey,
group_keys : Vec<(NameType, types::PublicSignKey)>)
-> RoutingMessage {
let header = MessageHeader::new(
original_header.message_id.clone(),
original_header.send_to(),
self.our_group_address(get_group_key.target_id.clone()),
Authority::NaeManager);
RoutingMessage::new(MessageTypeTag::GetGroupKeyResponse, header,
GetGroupKeyResponse{ public_sign_keys : group_keys },
&self.id.get_crypto_secret_sign_key()
)
}
fn construct_find_group_msg(&mut self) -> RoutingMessage {
let header = MessageHeader::new(
self.get_next_message_id(),
types::DestinationAddress {
dest: self.own_name.clone(),
relay_to: None
},
self.our_source_address(),
Authority::ManagedNode);
RoutingMessage::new(MessageTypeTag::FindGroup, header,
FindGroup{ requester_id: self.own_name.clone(),
target_id: self.own_name.clone()},
&self.id.get_crypto_secret_sign_key())
}
fn construct_find_group_response_msg(&mut self, original_header : &MessageHeader,
find_group: &FindGroup,
group: Vec<types::PublicId>) -> RoutingMessage {
let header = MessageHeader::new(self.get_next_message_id(),
original_header.send_to(),
self.our_group_address(find_group.target_id.clone()),
Authority::NaeManager);
RoutingMessage::new(MessageTypeTag::FindGroupResponse, header,
FindGroupResponse{ group: group }, &self.id.get_crypto_secret_sign_key())
}
fn construct_success_msg(&mut self) -> ConnectSuccess {
let connect_success = ConnectSuccess {
peer_id: self.own_name.clone(),
peer_fob: types::PublicId::new(&self.id),
};
return connect_success
}
fn construct_connect_request_msg(&mut self, peer_id: &NameType) -> RoutingMessage {
let header = MessageHeader::new(self.get_next_message_id(),
types::DestinationAddress {dest: peer_id.clone(), relay_to: None },
self.our_source_address(), Authority::ManagedNode);
let connect_request = ConnectRequest {
local_endpoints: self.accepting_on.clone(),
external_endpoints: vec![],
requester_id: self.own_name.clone(),
receiver_id: peer_id.clone(),
requester_fob: types::PublicId::new(&self.id),
};
RoutingMessage::new(MessageTypeTag::ConnectRequest, header, connect_request,
&self.id.get_crypto_secret_sign_key())
}
fn construct_connect_response_msg(&mut self, original_header : &MessageHeader, body: &Bytes, signature: &Signature,
connect_request: &ConnectRequest) -> RoutingMessage {
println!("{:?} construct_connect_response_msg ", self.own_name);
debug_assert!(connect_request.receiver_id == self.own_name, format!("{:?} == {:?} failed", self.own_name, connect_request.receiver_id));
let header = MessageHeader::new(original_header.message_id,
original_header.send_to(), self.our_source_address(),
Authority::ManagedNode);
let connect_response = ConnectResponse {
requester_local_endpoints: connect_request.local_endpoints.clone(),
requester_external_endpoints: connect_request.external_endpoints.clone(),
receiver_local_endpoints: self.accepting_on.clone(),
receiver_external_endpoints: vec![],
requester_id: connect_request.requester_id.clone(),
receiver_id: self.own_name.clone(),
receiver_fob: types::PublicId::new(&self.id),
serialised_connect_request: body.clone(),
connect_request_signature: signature.clone() };
RoutingMessage::new(MessageTypeTag::ConnectResponse, header,
connect_response, &self.id.get_crypto_secret_sign_key())
}
fn construct_get_data_response_msg(&mut self, original_header: &MessageHeader,
get_data: &GetData, data: Vec<u8>) -> RoutingMessage {
let header = MessageHeader::new(self.get_next_message_id(),
original_header.send_to(), self.our_source_address(),
Authority::ManagedNode);
let get_data_response = GetDataResponse {
name_and_type_id: get_data.name_and_type_id.clone(), data: Ok(data)
};
RoutingMessage::new(MessageTypeTag::GetDataResponse, header,
get_data_response, &self.id.get_crypto_secret_sign_key())
}
fn get_next_message_id(&mut self) -> MessageId {
let temp = self.next_message_id;
self.next_message_id += 1;
return temp;
}
fn send<'a, I>(&self, targets: I, message: &Bytes) where I: Iterator<Item=&'a Endpoint> {
for target in targets {
ignore(self.connection_manager.send(target.clone(), message.clone()));
}
}
fn send_by_name<'a, I>(&self, peers: I, serialised_msg: Bytes) where I: Iterator<Item=&'a NameType> {
for peer in peers {
self.send(self.name_to_endpoint(peer).into_iter(), &serialised_msg);
}
}
fn name_to_endpoint(&self, name: &NameType) -> Option<&Endpoint> {
match self.all_connections.1.get(name) {
Some(endpoints) => {
assert!(!endpoints.is_empty());
Some(&endpoints[0])
},
None => None
}
}
fn endpoint_to_name(&self, endpoint: &Endpoint) -> Option<&NameType> {
self.all_connections.0.get(&endpoint)
}
fn send_to_bootstrap_node(&mut self, msg: &Bytes) {
self.send(self.bootstrap_endpoint.iter(), &msg);
}
fn send_swarm_or_parallel(&self, target: &NameType, msg: &Bytes) {
for peer in self.routing_table.target_nodes(target) {
match self.all_connections.1.get(&peer.id()) {
Some(peer_endpoints) => {
assert!(!peer_endpoints.is_empty());
if self.connection_manager.send(peer_endpoints[0].clone(),
msg.clone()).is_err() {
println!("{:?} failed to send to {:?}", self.own_name, peer.id());
}
}
None => ()
}
}
}
fn address_in_close_group_range(&self, address: &NameType) -> bool {
if self.routing_table.size() < RoutingTable::get_group_size() {
return true;
}
match self.routing_table.our_close_group().pop() {
Some(furthest_close_node) => {
closer_to_target_or_equal(&address, &furthest_close_node.id(), &self.own_name)
},
None => false }
}
pub fn id(&self) -> NameType { self.own_name.clone() }
fn mut_interface(&mut self) -> &mut F { self.interface.deref_mut() }
}
fn encode<T>(value: &T) -> Result<Bytes, CborError> where T: Encodable {
let mut enc = Encoder::from_memory();
try!(enc.encode(&[value]));
Ok(enc.into_bytes())
}
fn decode<T>(bytes: &Bytes) -> Result<T, CborError> where T: Decodable {
let mut dec = Decoder::from_bytes(&bytes[..]);
match dec.decode().next() {
Some(result) => result,
None => Err(CborError::UnexpectedEOF)
}
}
fn ignore<R,E>(_: Result<R,E>) {}
#[cfg(test)]
mod test {
use routing_node::{RoutingNode};
use node_interface::*;
use name_type::NameType;
use super::encode;
use types::MessageAction;
use error::{ResponseError, InterfaceError};
use sendable::Sendable;
use messages::put_data::PutData;
use messages::put_data_response::PutDataResponse;
use messages::get_data::GetData;
use messages::get_data_response::GetDataResponse;
use messages::get_client_key::GetKey;
use messages::post::Post;
use messages::put_public_id::PutPublicId;
use messages::{RoutingMessage, MessageTypeTag};
use message_header::MessageHeader;
use std::sync::{Arc, Mutex};
use routing_table;
use test_utils::Random;
use rand::random;
use name_type::{closer_to_target};
use types;
use types::{Id, PublicId};
use authority::Authority;
use rustc_serialize::{Encodable, Decodable};
use cbor::{Encoder};
use std::thread;
use test_utils::{random_endpoint, random_endpoints};
struct NullInterface;
#[derive(Clone)]
struct Stats {
call_count: u32,
data: Vec<u8>
}
struct TestInterface {
stats: Arc<Mutex<Stats>>
}
struct TestData {
data: Vec<u8>
}
impl TestData {
fn new(in_data: Vec<u8>) -> TestData {
TestData { data: in_data }
}
}
impl Sendable for TestData {
fn name(&self) -> NameType { Random::generate_random() }
fn type_tag(&self)->u64 { unimplemented!() }
fn serialised_contents(&self)->Vec<u8> { self.data.clone() }
fn refresh(&self)->bool {
false
}
fn merge(&self, responses: Vec<Box<Sendable>>) -> Option<Box<Sendable>> { None }
}
impl Interface for TestInterface {
fn handle_get_key(&mut self, type_id: u64, name : NameType, our_authority: Authority,
from_authority: Authority, from_address: NameType) -> Result<MessageAction, InterfaceError> {
let stats = self.stats.clone();
let mut stats_value = stats.lock().unwrap();
stats_value.call_count += 1;
let data = stats_value.data.clone();
Ok(MessageAction::Reply(data))
}
fn handle_get(&mut self, type_id: u64, name : NameType, our_authority: Authority,
from_authority: Authority, from_address: NameType) -> Result<MessageAction, InterfaceError> {
let stats = self.stats.clone();
let mut stats_value = stats.lock().unwrap();
stats_value.call_count += 1;
Ok(MessageAction::Reply("handle_get called".to_string().into_bytes()))
}
fn handle_put(&mut self, our_authority: Authority, from_authority: Authority,
from_address: NameType, dest_address: types::DestinationAddress,
data: Vec<u8>) -> Result<MessageAction, InterfaceError> {
let stats = self.stats.clone();
let mut stats_value = stats.lock().unwrap();
stats_value.call_count += 1;
stats_value.data = match from_authority {
Authority::Unknown => "UnauthorisedPut".to_string().into_bytes(),
_ => "AuthorisedPut".to_string().into_bytes(),
};
Ok(MessageAction::Reply(data))
}
fn handle_post(&mut self, our_authority: Authority, from_authority: Authority,
from_address: NameType, name: NameType, data: Vec<u8>) -> Result<MessageAction, InterfaceError> {
let stats = self.stats.clone();
let mut stats_value = stats.lock().unwrap();
stats_value.call_count += 1;
stats_value.data = data.clone();
Ok(MessageAction::Reply(data))
}
fn handle_get_response(&mut self, from_address: NameType, response: Result<Vec<u8>,
ResponseError>) -> MethodCall {
let stats = self.stats.clone();
let mut stats_value = stats.lock().unwrap();
stats_value.call_count += 1;
stats_value.data = "handle_get_response called".to_string().into_bytes();
MethodCall::None
}
fn handle_put_response(&mut self, from_authority: Authority, from_address: NameType,
response: Result<Vec<u8>, ResponseError>) -> MethodCall {
let stats = self.stats.clone();
let mut stats_value = stats.lock().unwrap();
stats_value.call_count += 1;
stats_value.data = match response {
Ok(data) => data,
Err(_) => vec![]
};
MethodCall::None
}
fn handle_post_response(&mut self, from_authority: Authority, from_address: NameType,
response: Result<Vec<u8>, ResponseError>) {
unimplemented!();
}
fn handle_churn(&mut self, close_group: Vec<NameType>)
-> Vec<MethodCall> {
unimplemented!();
}
fn handle_cache_get(&mut self, type_id: u64, name : NameType, from_authority: Authority,
from_address: NameType) -> Result<MessageAction, InterfaceError> {
Err(InterfaceError::Abort)
}
fn handle_cache_put(&mut self, from_authority: Authority, from_address: NameType,
data: Vec<u8>) -> Result<MessageAction, InterfaceError> {
Err(InterfaceError::Abort)
}
}
#[test]
fn check_next_id() {
let mut routing_node = RoutingNode::new(TestInterface { stats: Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]})) });
assert_eq!(routing_node.get_next_message_id() + 1, routing_node.get_next_message_id());
}
fn call_operation<T>(operation: T, message_type: MessageTypeTag, stats: Arc<Mutex<Stats>>) -> Stats where T: Encodable, T: Decodable {
let stats_copy = stats.clone();
let mut n1 = RoutingNode::new(TestInterface { stats: stats_copy });
let header = MessageHeader {
message_id: n1.get_next_message_id(),
destination: types::DestinationAddress { dest: n1.own_name.clone(), relay_to: None },
source: types::SourceAddress { from_node: Random::generate_random(), from_group: None, reply_to: None, relayed_for: None },
authority: match message_type {
MessageTypeTag::UnauthorisedPut => Authority::Unknown,
_ => Authority::NaeManager
}
};
let message = RoutingMessage::new( message_type, header.clone(),
operation, &n1.id.get_crypto_secret_sign_key());
let serialised_msssage = encode(&message).unwrap();
let _ = n1.message_received(&header.source.from_node, serialised_msssage);
let stats = stats.clone();
let stats_value = stats.lock().unwrap();
stats_value.clone()
}
#[test]
fn call_put() {
let data = "this is a known string".to_string().into_bytes();
let chunk = Box::new(TestData::new(data));
let mut n1 = RoutingNode::new(TestInterface { stats: Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]})) });
let name: NameType = Random::generate_random();
n1.put(name, chunk);
}
#[test]
fn call_unauthorised_put() {
let data = "this is a known string".to_string().into_bytes();
let chunk = Box::new(TestData::new(data));
let mut n1 = RoutingNode::new(TestInterface { stats: Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]})) });
let name: NameType = Random::generate_random();
n1.unauthorised_put(name, chunk);
}
#[test]
fn call_handle_put() {
let stats = Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]}));
let put_data: PutData = Random::generate_random();
assert_eq!(call_operation(put_data, MessageTypeTag::PutData, stats).call_count, 1u32);
}
#[test]
fn call_handle_authorised_put() {
let stats = Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]}));
let unauthorised_put: PutData = Random::generate_random();
let result_stats = call_operation(unauthorised_put, MessageTypeTag::UnauthorisedPut, stats);
assert_eq!(result_stats.call_count, 1u32);
assert_eq!(result_stats.data, "UnauthorisedPut".to_string().into_bytes());
}
#[test]
fn call_handle_put_response() {
let stats = Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]}));
let put_data_response: PutDataResponse = Random::generate_random();
assert_eq!(call_operation(put_data_response, MessageTypeTag::PutDataResponse, stats).call_count, 1u32);
}
#[test]
fn call_get() {
let mut n1 = RoutingNode::new(TestInterface { stats: Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]})) });
let name: NameType = Random::generate_random();
n1.get(100u64, name);
}
#[test]
fn call_handle_get_data() {
let stats = Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]}));
let get_data: GetData = Random::generate_random();
assert_eq!(call_operation(get_data, MessageTypeTag::GetData, stats).call_count, 1u32);
}
#[test]
fn call_handle_get_data_response() {
let stats = Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]}));
let get_data: GetDataResponse = Random::generate_random();
assert_eq!(call_operation(get_data, MessageTypeTag::GetDataResponse, stats).call_count, 1u32);
}
#[test]
fn call_handle_get_key() {
let stats = Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]}));
let get_key: GetKey = Random::generate_random();
let public_key: types::PublicSignKey = Random::generate_random();
let mut enc = Encoder::from_memory();
let _ = enc.encode(&[public_key]);
stats.lock().unwrap().data = enc.into_bytes();
assert_eq!(call_operation(get_key, MessageTypeTag::GetKey, stats).call_count, 1u32);
}
#[test]
fn call_handle_post() {
let stats = Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]}));
let post: Post = Random::generate_random();
assert_eq!(call_operation(post, MessageTypeTag::Post, stats).call_count, 1u32);
}
#[test]
fn network() {
let network_size = 2usize;
let node = Arc::new(Mutex::new(RoutingNode::new(TestInterface { stats: Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]})) })));
let use_node = node.clone();
let mut runners = Vec::new();
runners.push(thread::spawn(move || loop {
let mut use_node = use_node.lock().unwrap();
use_node.run();
if use_node.routing_table.size() == network_size - 1 {
break;
}
}));
let listening_endpoints = node.lock().unwrap().accepting_on.clone();
println!("network: {:?}, {:?}", &listening_endpoints, node.lock().unwrap().id());
for _ in 0..(network_size - 1) {
let node = Arc::new(Mutex::new(RoutingNode::new(TestInterface { stats: Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]})) })));
let use_node = node.clone();
runners.push(thread::spawn(move || loop {
let mut use_node = use_node.lock().unwrap();
use_node.run();
if use_node.routing_table.size() == network_size - 1 {
break;
}
}));
let mut use_node2 = node.lock().unwrap();
match use_node2.bootstrap(Some(listening_endpoints.clone()), None) {
Ok(_) => { assert!(true) },
Err(_) => { assert!(false); }
}
thread::sleep_ms(1000);
}
for runner in runners {
assert!(runner.join().is_ok());
}
}
fn populate_routing_node() -> RoutingNode<TestInterface> {
let mut routing_node = RoutingNode::new(TestInterface {
stats: Arc::new(Mutex::new(Stats {call_count: 0, data: vec![]})) });
let mut count : usize = 0;
loop {
routing_node.routing_table.add_node(routing_table::NodeInfo::new(
PublicId::new(&Id::new()), random_endpoints(),
Some(random_endpoint())));
count += 1;
if routing_node.routing_table.size() >=
routing_table::RoutingTable::get_optimal_size() { break; }
if count >= 2 * routing_table::RoutingTable::get_optimal_size() {
panic!("Routing table does not fill up."); }
}
routing_node
}
#[test]
fn relocate_original_public_id() {
let mut routing_node = populate_routing_node();
let furthest_closest_node = routing_node.routing_table.our_close_group().last().unwrap().id();
let our_name = routing_node.own_name.clone();
let total_inside : u32 = 50;
let limit_attempts : u32 = 200;
let mut stored_public_ids : Vec<PublicId> = Vec::with_capacity(total_inside as usize);
let mut count_inside : u32 = 0;
let mut count_total : u32 = 0;
loop {
let put_public_id = PutPublicId{ public_id : PublicId::new(&Id::new()) };
let put_public_id_header : MessageHeader = MessageHeader {
message_id : random::<u32>(),
destination : types::DestinationAddress {
dest : put_public_id.public_id.name(), relay_to : None },
source : types::SourceAddress {
from_node : Random::generate_random(), from_group : None, reply_to : None, relayed_for : None },
authority : Authority::ManagedNode
};
let serialised_msg = encode(&put_public_id).unwrap();
let result = routing_node.handle_put_public_id(put_public_id_header,
serialised_msg);
if closer_to_target(&put_public_id.public_id.name(),
&furthest_closest_node,
&our_name) {
assert!(result.is_ok());
stored_public_ids.push(put_public_id.public_id);
count_inside += 1;
} else {
assert!(result.is_err());
}
count_total += 1;
if count_inside >= total_inside {
break; }
if count_total >= limit_attempts {
if count_inside > 0 {
println!("Could only verify {} successful public_ids inside
our group before limit reached.", count_inside);
break;
} else { panic!("No PublicIds were found inside our close group!"); }
}
}
for public_id in stored_public_ids {
assert!(!routing_node.public_id_cache.check(&public_id.name()));
}
assert_eq!(routing_node.public_id_cache.len(), 0usize);
}
#[test]
fn cache_relocated_public_id() {
let mut routing_node = populate_routing_node();
let furthest_closest_node = routing_node.routing_table.our_close_group().last().unwrap().id();
let our_name = routing_node.own_name.clone();
let total_inside : u32 = 50;
let limit_attempts : u32 = 200;
let mut stored_public_ids : Vec<PublicId> = Vec::with_capacity(total_inside as usize);
let mut count_inside : u32 = 0;
let mut count_total : u32 = 0;
loop {
let original_public_id = PublicId::generate_random();
let mut close_nodes_to_original_name : Vec<NameType> = Vec::new();
for i in 0..types::GROUP_SIZE {
close_nodes_to_original_name.push(Random::generate_random());
}
let relocated_name = types::calculate_relocated_name(close_nodes_to_original_name.clone(),
&original_public_id.name()).unwrap();
let mut relocated_public_id = original_public_id.clone();
assert!(relocated_public_id.assign_relocated_name(relocated_name.clone()));
let put_public_id = PutPublicId{ public_id : relocated_public_id };
let put_public_id_header : MessageHeader = MessageHeader {
message_id : random::<u32>(),
destination : types::DestinationAddress {
dest : put_public_id.public_id.name(), relay_to : None },
source : types::SourceAddress {
from_node : close_nodes_to_original_name[0].clone(), from_group : Some(original_public_id.name()), reply_to : None, relayed_for : None },
authority : Authority::NaeManager
};
let serialised_msg = encode(&put_public_id).unwrap();
let result = routing_node.handle_put_public_id(put_public_id_header, serialised_msg);
if closer_to_target(&put_public_id.public_id.name(),
&furthest_closest_node,
&our_name) {
assert!(result.is_ok());
stored_public_ids.push(put_public_id.public_id);
count_inside += 1;
} else {
assert!(result.is_err());
}
count_total += 1;
if count_inside >= total_inside {
break; }
if count_total >= limit_attempts {
if count_inside > 0 {
println!("Could only verify {} successful public_ids inside
our group before limit reached.", count_inside);
break;
} else { panic!("No PublicIds were found inside our close group!"); }
}
}
for public_id in stored_public_ids {
assert!(routing_node.public_id_cache.check(&public_id.name()));
}
assert_eq!(routing_node.public_id_cache.len(), total_inside as usize);
}
}