use crust::PeerId;
use authority::Authority;
use error::RoutingError;
use id::PublicId;
use messages::{HopMessage, RoutingMessage, SignedMessage};
use peer_manager::GROUP_SIZE;
use state_machine::Transition;
use super::{Bootstrapped, HandleHopMessage, SendRoutingMessage};
pub trait ProxyClient: Bootstrapped {
fn proxy_peer_id(&self) -> &PeerId;
fn proxy_public_id(&self) -> &PublicId;
}
impl<T> HandleHopMessage for T where T: ProxyClient + SendRoutingMessage {
fn handle_hop_message(&mut self,
hop_msg: HopMessage,
peer_id: PeerId)
-> Result<Transition, RoutingError> {
if *self.proxy_peer_id() == peer_id {
try!(hop_msg.verify(self.proxy_public_id().signing_public_key()));
} else {
return Err(RoutingError::UnknownConnection(peer_id));
}
let signed_msg = hop_msg.content();
try!(signed_msg.check_integrity());
if self.signed_msg_filter().filter_incoming(signed_msg) > GROUP_SIZE {
return Err(RoutingError::FilterCheckFailed);
}
let routing_msg = signed_msg.routing_message();
if !is_recipient(self.full_id().public_id(), &routing_msg.dst) {
return Ok(Transition::Stay);
}
if let Some(msg) = try!(self.accumulate(routing_msg, signed_msg.public_id())) {
if msg.src.is_group() {
self.send_ack(&msg, 0);
}
self.dispatch_routing_message(msg)
} else {
Ok(Transition::Stay)
}
}
}
impl<T> SendRoutingMessage for T where T: ProxyClient {
fn send_routing_message_via_route(&mut self,
routing_msg: RoutingMessage,
route: u8)
-> Result<(), RoutingError> {
self.stats().count_route(route);
if let Authority::Client { .. } = routing_msg.dst {
if is_recipient(self.full_id().public_id(), &routing_msg.dst) {
return Ok(()); }
}
let proxy_peer_id = if let Authority::Client { ref proxy_node_name, .. } =
routing_msg.src {
if *self.proxy_public_id().name() == *proxy_node_name {
*self.proxy_peer_id()
} else {
error!("{:?} - Unable to find connection to proxy node in proxy map",
self);
return Err(RoutingError::ProxyConnectionNotFound);
}
} else {
error!("{:?} - Source should be client if our state is a Client",
self);
return Err(RoutingError::InvalidSource);
};
let signed_msg = try!(SignedMessage::new(routing_msg, &self.full_id()));
if !self.add_to_pending_acks(&signed_msg, route) {
return Ok(());
}
if !self.filter_outgoing_signed_msg(&signed_msg, &proxy_peer_id, route) {
let bytes =
try!(super::to_hop_bytes(signed_msg.clone(), route, Vec::new(), &self.full_id()));
if let Err(error) = self.send_or_drop(&proxy_peer_id, bytes, signed_msg.priority()) {
info!("{:?} - Error sending message to {:?}: {:?}.",
self,
proxy_peer_id,
error);
}
}
Ok(())
}
}
fn is_recipient(public_id: &PublicId, dst: &Authority) -> bool {
if let Authority::Client { ref client_key, .. } = *dst {
client_key == public_id.signing_public_key()
} else {
false
}
}