use super::Base;
use ack_manager::{Ack, AckManager, UnacknowledgedMessage, ACK_TIMEOUT_SECS};
use error::RoutingError;
#[cfg(feature = "use-mock-crust")]
use fake_clock::FakeClock as Instant;
use id::PublicId;
use maidsafe_utilities::serialisation;
use messages::{HopMessage, Message, MessageContent, RoutingMessage, SignedMessage};
use routing_message_filter::RoutingMessageFilter;
use routing_table::Authority;
use std::collections::BTreeSet;
use std::time::Duration;
#[cfg(not(feature = "use-mock-crust"))]
use std::time::Instant;
use timer::Timer;
use xor_name::XorName;
pub trait Bootstrapped: Base {
fn ack_mgr(&self) -> &AckManager;
fn ack_mgr_mut(&mut self) -> &mut AckManager;
fn send_routing_message_via_route(
&mut self,
routing_msg: RoutingMessage,
route: u8,
expires_at: Option<Instant>,
) -> Result<(), RoutingError>;
fn routing_msg_filter(&mut self) -> &mut RoutingMessageFilter;
fn timer(&mut self) -> &mut Timer;
fn add_to_pending_acks(
&mut self,
routing_msg: &RoutingMessage,
route: u8,
expires_at: Option<Instant>,
) -> bool {
if let MessageContent::Ack(..) = routing_msg.content {
return true;
}
let ack = match Ack::compute(routing_msg) {
Ok(ack) => ack,
Err(error) => {
error!("{:?} Failed to create ack: {:?}", self, error);
return true;
}
};
if self.ack_mgr_mut().did_receive(ack) {
return false;
}
let token = self.timer().schedule(Duration::from_secs(ACK_TIMEOUT_SECS));
let unacked_msg = UnacknowledgedMessage {
routing_msg: routing_msg.clone(),
route,
timer_token: token,
expires_at,
};
if let Some(ejected) = self.ack_mgr_mut().add_to_pending(ack, unacked_msg) {
debug!(
"{:?} - Ejected pending ack: {:?} - {:?}",
self, ack, ejected
);
}
true
}
fn filter_outgoing_routing_msg(
&mut self,
msg: &RoutingMessage,
pub_id: &PublicId,
route: u8,
) -> bool {
if self
.routing_msg_filter()
.filter_outgoing(msg, pub_id, route)
{
return true;
}
self.stats().count_routing_message(msg);
false
}
fn resend_unacknowledged_timed_out_msgs(&mut self, token: u64) {
if let Some((unacked_msg, _ack)) = self.ack_mgr_mut().find_timed_out(token) {
if unacked_msg.route as usize == self.min_section_size() {
debug!(
"{:?} Message unable to be acknowledged - giving up. {:?}",
self, unacked_msg
);
self.stats().count_unacked();
} else if let Err(error) = self.send_routing_message_via_route(
unacked_msg.routing_msg,
unacked_msg.route,
unacked_msg.expires_at,
) {
debug!("{:?} Failed to send message: {:?}", self, error);
}
}
}
fn send_routing_message_with_expiry(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
content: MessageContent,
expires_at: Option<Instant>,
) -> Result<(), RoutingError> {
let routing_msg = RoutingMessage { src, dst, content };
self.send_routing_message_via_route(routing_msg, 0, expires_at)
}
fn send_routing_message(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
content: MessageContent,
) -> Result<(), RoutingError> {
self.send_routing_message_with_expiry(src, dst, content, None)
}
fn send_ack(&mut self, routing_msg: &RoutingMessage, route: u8) {
self.send_ack_from(routing_msg, route, routing_msg.dst);
}
fn send_ack_from(&mut self, routing_msg: &RoutingMessage, route: u8, src: Authority<XorName>) {
if let MessageContent::Ack(..) = routing_msg.content {
return;
}
let response = match RoutingMessage::ack_from(routing_msg, src) {
Ok(response) => response,
Err(error) => {
error!("{:?} Failed to create ack: {:?}", self, error);
return;
}
};
if let Err(error) = self.send_routing_message_via_route(response, route, None) {
error!("{:?} Failed to send ack: {:?}", self, error);
}
}
fn to_hop_bytes(
&self,
signed_msg: SignedMessage,
route: u8,
sent_to: BTreeSet<XorName>,
) -> Result<Vec<u8>, RoutingError> {
let hop_msg = HopMessage::new(
signed_msg,
route,
sent_to,
self.full_id().signing_private_key(),
)?;
let message = Message::Hop(hop_msg);
Ok(serialisation::serialise(&message)?)
}
}