mod bootstrapped;
mod connect;
mod proxy_client;
use crust::{PeerId, Service};
use maidsafe_utilities::serialisation;
use sodiumoxide::crypto::sign;
use std::fmt::Debug;
use authority::Authority;
use error::RoutingError;
use event::Event;
use id::{FullId, PublicId};
use messages::{DirectMessage, HopMessage, Message, MessageContent, RoutingMessage, SignedMessage,
UserMessage};
use state_machine::Transition;
use stats::Stats;
use xor_name::XorName;
pub use self::bootstrapped::Bootstrapped;
pub use self::connect::Connect;
pub use self::proxy_client::ProxyClient;
pub const USER_MSG_CACHE_EXPIRY_DURATION_SECS: u64 = 60 * 20;
pub fn to_hop_bytes(signed_msg: SignedMessage,
route: u8,
sent_to: Vec<XorName>,
full_id: &FullId)
-> Result<Vec<u8>, RoutingError> {
let hop_msg = try!(HopMessage::new(signed_msg, route, sent_to, full_id.signing_private_key()));
let message = Message::Hop(hop_msg);
Ok(try!(serialisation::serialise(&message)))
}
pub fn verify_signed_public_id(serialised_public_id: &[u8],
signature: &sign::Signature)
-> Result<PublicId, RoutingError> {
let public_id: PublicId = try!(serialisation::deserialise(serialised_public_id));
let public_key = public_id.signing_public_key();
if sign::verify_detached(signature, serialised_public_id, public_key) {
Ok(public_id)
} else {
Err(RoutingError::FailedSignature)
}
}
pub trait AnyState: Debug {
fn crust_service(&self) -> &Service;
fn full_id(&self) -> &FullId;
fn stats(&mut self) -> &mut Stats;
fn send_event(&self, event: Event);
fn handle_lost_peer(&mut self, _peer_id: PeerId) -> Transition {
Transition::Stay
}
fn name(&self) -> &XorName {
self.full_id().public_id().name()
}
fn send_direct_message(&mut self,
dst_id: &PeerId,
direct_message: DirectMessage)
-> Result<(), RoutingError> {
self.stats().count_direct_message(&direct_message);
let priority = direct_message.priority();
let (message, peer_id) = self.wrap_direct_message(dst_id, direct_message);
let raw_bytes = match serialisation::serialise(&message) {
Err(error) => {
error!("{:?} Failed to serialise message {:?}: {:?}",
self,
message,
error);
return Err(error.into());
}
Ok(bytes) => bytes,
};
self.send_or_drop(&peer_id, raw_bytes, priority)
}
fn send_or_drop(&mut self,
peer_id: &PeerId,
bytes: Vec<u8>,
priority: u8)
-> Result<(), RoutingError> {
self.stats().count_bytes(bytes.len());
if let Err(err) = self.crust_service().send(*peer_id, bytes.clone(), priority) {
info!("{:?} Connection to {:?} failed. Calling crust::Service::disconnect.",
self,
peer_id);
self.crust_service().disconnect(*peer_id);
let _ = self.handle_lost_peer(*peer_id);
return Err(err.into());
}
Ok(())
}
fn wrap_direct_message(&self,
dst_id: &PeerId,
direct_message: DirectMessage)
-> (Message, PeerId) {
(Message::Direct(direct_message), *dst_id)
}
}
pub trait HandleHopMessage {
fn handle_hop_message(&mut self,
hop_msg: HopMessage,
peer_id: PeerId)
-> Result<Transition, RoutingError>;
}
pub trait HandleUserMessage: AnyState {
fn add_to_user_msg_cache(&mut self,
hash: u64,
part_count: u32,
part_index: u32,
payload: Vec<u8>)
-> Option<UserMessage>;
fn handle_user_message_part(&mut self,
hash: u64,
part_count: u32,
part_index: u32,
payload: Vec<u8>,
src: Authority,
dst: Authority) {
if let Some(msg) = self.add_to_user_msg_cache(hash, part_count, part_index, payload) {
self.handle_user_message(msg, src, dst)
}
}
fn handle_user_message(&mut self, msg: UserMessage, src: Authority, dst: Authority) {
let event = match msg {
UserMessage::Request(request) => {
self.stats().count_request(&request);
Event::Request {
request: request,
src: src,
dst: dst,
}
}
UserMessage::Response(response) => {
self.stats().count_response(&response);
Event::Response {
response: response,
src: src,
dst: dst,
}
}
};
self.send_event(event);
}
}
pub trait SendRoutingMessage: Debug {
fn send_routing_message_via_route(&mut self,
routing_msg: RoutingMessage,
route: u8)
-> Result<(), RoutingError>;
fn send_routing_message(&mut self, routing_msg: RoutingMessage) -> Result<(), RoutingError> {
self.send_routing_message_via_route(routing_msg, 0)
}
fn send_ack(&mut self, routing_msg: &RoutingMessage, route: u8) {
self.send_ack_from(routing_msg, route, routing_msg.dst.clone());
}
fn send_ack_from(&mut self, routing_msg: &RoutingMessage, route: u8, src: Authority) {
if let MessageContent::Ack(..) = routing_msg.content {
return;
}
let response = match RoutingMessage::ack_from(routing_msg, src) {
Ok(response) => response,
Err(error) => {
error!("{:?} - Failed to create ack: {:?}", self, error);
return;
}
};
if let Err(error) = self.send_routing_message_via_route(response, route) {
debug!("{:?} - Failed to send ack: {:?}", self, error);
}
}
}
#[cfg(feature = "use-mock-crust")]
pub trait Testable: Bootstrapped {
fn clear_state(&mut self) {}
fn resend_unacknowledged(&mut self) -> bool {
self.timer().stop();
let timer_tokens = self.ack_mgr().timer_tokens();
for timer_token in &timer_tokens {
self.resend_unacknowledged_timed_out_msgs(*timer_token);
}
!timer_tokens.is_empty()
}
fn has_unacknowledged(&self) -> bool {
self.ack_mgr().has_pending()
}
}