use crust::PeerId;
use std::time::Duration;
use ack_manager::{ACK_TIMEOUT_SECS, Ack, AckManager, UnacknowledgedMessage};
use error::RoutingError;
use id::PublicId;
use messages::{MessageContent, RoutingMessage, SignedMessage};
use peer_manager::GROUP_SIZE;
use signed_message_filter::SignedMessageFilter;
use state_machine::Transition;
use super::{AnyState, SendRoutingMessage};
use timer::Timer;
pub trait Bootstrapped: AnyState + SendRoutingMessage {
fn accumulate(&mut self,
routing_msg: &RoutingMessage,
public_id: &PublicId)
-> Result<Option<RoutingMessage>, RoutingError>;
fn ack_mgr(&self) -> &AckManager;
fn ack_mgr_mut(&mut self) -> &mut AckManager;
fn dispatch_routing_message(&mut self,
routing_msg: RoutingMessage)
-> Result<Transition, RoutingError>;
fn signed_msg_filter(&mut self) -> &mut SignedMessageFilter;
fn timer(&mut self) -> &mut Timer;
fn add_to_pending_acks(&mut self, signed_msg: &SignedMessage, route: u8) -> bool {
if let MessageContent::Ack(..) = signed_msg.routing_message().content {
return true;
}
if *signed_msg.public_id() != *self.full_id().public_id() {
return true;
}
let ack = match Ack::compute(signed_msg.routing_message()) {
Ok(ack) => ack,
Err(error) => {
error!("Failed to create ack: {:?}", error);
return true;
}
};
if self.ack_mgr_mut().did_receive(ack) {
return false;
}
let token = self.timer().schedule(Duration::from_secs(ACK_TIMEOUT_SECS));
let unacked_msg = UnacknowledgedMessage {
routing_msg: signed_msg.routing_message().clone(),
route: route,
timer_token: token,
};
if let Some(ejected) = self.ack_mgr_mut().add_to_pending(ack, unacked_msg) {
trace!("{:?} - Ejected pending ack: {:?} - {:?}",
self,
ack,
ejected);
}
true
}
fn filter_outgoing_signed_msg(&mut self,
msg: &SignedMessage,
peer_id: &PeerId,
route: u8)
-> bool {
if self.signed_msg_filter().filter_outgoing(msg, peer_id, route) {
return true;
}
self.stats().count_routing_message(msg.routing_message());
false
}
fn resend_unacknowledged_timed_out_msgs(&mut self, token: u64) {
if let Some((unacked_msg, ack)) = self.ack_mgr_mut().find_timed_out(token) {
trace!("{:?} - Timed out waiting for ack({}) {:?}",
self,
ack,
unacked_msg);
if unacked_msg.route as usize == GROUP_SIZE {
debug!("{:?} - Message unable to be acknowledged - giving up. {:?}",
self,
unacked_msg);
self.stats().count_unacked();
} else if let Err(error) =
self.send_routing_message_via_route(unacked_msg.routing_msg, unacked_msg.route) {
debug!("{:?} Failed to send message: {:?}", self, error);
}
}
}
}