use std::collections::{HashMap, HashSet};
use std::fmt;
use std::time::Duration;
use action::{Action, ActionQueue};
use message::{GossipMessage, GraftMessage, IhaveMessage, Message, ProtocolMessage, PruneMessage};
use missing::MissingMessages;
use time::{Clock, NodeTime};
use System;
#[derive(Debug, Clone)]
pub struct NodeOptions {
pub ihave_timeout: Duration,
pub optimization_threshold: u16,
}
impl Default for NodeOptions {
fn default() -> Self {
NodeOptions {
ihave_timeout: Duration::from_millis(500),
optimization_threshold: 2,
}
}
}
pub struct Node<T: System> {
id: T::NodeId,
options: NodeOptions,
eager_push_peers: HashSet<T::NodeId>,
lazy_push_peers: HashSet<T::NodeId>,
messages: HashMap<T::MessageId, T::MessagePayload>,
missings: MissingMessages<T>,
actions: ActionQueue<T>,
clock: Clock,
}
impl<T: System> fmt::Debug for Node<T>
where
T::NodeId: fmt::Debug,
T::MessageId: fmt::Debug,
T::MessagePayload: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Node {{ id: {:?}, options: {:?}, eager_push_peers: {:?}, lazy_push_peers: {:?}, \
messages: {:?}, missings: {:?}, actions: {:?}, clock: {:?} }}",
self.id,
self.options,
self.eager_push_peers,
self.lazy_push_peers,
self.messages,
self.missings,
self.actions,
self.clock
)
}
}
impl<T: System> Node<T> {
pub fn new(node_id: T::NodeId) -> Self {
Self::with_options(node_id, NodeOptions::default())
}
pub fn with_options(node_id: T::NodeId, options: NodeOptions) -> Self {
Node {
id: node_id,
options,
eager_push_peers: HashSet::new(),
lazy_push_peers: HashSet::new(),
messages: HashMap::new(),
missings: MissingMessages::new(),
actions: ActionQueue::new(),
clock: Clock::new(),
}
}
pub fn id(&self) -> &T::NodeId {
&self.id
}
pub fn options(&self) -> &NodeOptions {
&self.options
}
pub fn options_mut(&mut self) -> &mut NodeOptions {
&mut self.options
}
pub fn eager_push_peers(&self) -> &HashSet<T::NodeId> {
&self.eager_push_peers
}
pub fn lazy_push_peers(&self) -> &HashSet<T::NodeId> {
&self.lazy_push_peers
}
pub fn broadcast_message(&mut self, message: Message<T>) {
self.actions.deliver(message.clone());
let gossip = GossipMessage::new(&self.id, message, 0);
self.eager_push(&gossip);
self.lazy_push(&gossip);
self.messages
.insert(gossip.message.id, gossip.message.payload);
}
pub fn messages(&self) -> &HashMap<T::MessageId, T::MessagePayload> {
&self.messages
}
pub fn waiting_messages(&self) -> usize {
self.missings.waiting_messages()
}
pub fn forget_message(&mut self, message_id: &T::MessageId) -> bool {
self.messages.remove(message_id).is_some()
}
pub fn poll_action(&mut self) -> Option<Action<T>> {
self.handle_expiration();
self.actions.pop()
}
pub fn handle_protocol_message(&mut self, message: ProtocolMessage<T>) -> bool {
if !self.is_known_node(message.sender()) {
return false;
}
match message {
ProtocolMessage::Gossip(m) => self.handle_gossip(m),
ProtocolMessage::Ihave(m) => self.handle_ihave(m),
ProtocolMessage::Graft(m) => self.handle_graft(m),
ProtocolMessage::Prune(m) => self.handle_prune(m),
}
true
}
pub fn handle_neighbor_up(&mut self, neighbor_node_id: &T::NodeId) {
if self.is_known_node(neighbor_node_id) || self.id == *neighbor_node_id {
return;
}
for message_id in self.messages.keys() {
let ihave = IhaveMessage::new(&self.id, message_id.clone(), 0, false);
self.actions.send(neighbor_node_id.clone(), ihave);
}
self.eager_push_peers.insert(neighbor_node_id.clone());
}
pub fn handle_neighbor_down(&mut self, neighbor_node_id: &T::NodeId) {
if !self.is_known_node(neighbor_node_id) {
return;
}
self.eager_push_peers.remove(neighbor_node_id);
self.lazy_push_peers.remove(neighbor_node_id);
if self.eager_push_peers.is_empty() {
while let Some(ihave) = self.missings.pop_expired(&Clock::max()) {
if self.send_graft(ihave) {
break;
}
}
}
}
pub fn clock(&self) -> &Clock {
&self.clock
}
pub fn clock_mut(&mut self) -> &mut Clock {
&mut self.clock
}
pub fn next_expiry_time(&self) -> Option<NodeTime> {
self.missings.next_expiry_time()
}
fn handle_expiration(&mut self) {
while let Some(ihave) = self.missings.pop_expired(&self.clock) {
self.send_graft(ihave);
}
}
fn send_graft(&mut self, ihave: IhaveMessage<T>) -> bool {
if !self.is_known_node(&ihave.sender) {
false
} else {
self.eager_push_peers.insert(ihave.sender.clone());
self.lazy_push_peers.remove(&ihave.sender);
self.actions.send(
ihave.sender,
GraftMessage::new(&self.id, Some(ihave.message_id), ihave.round),
);
true
}
}
#[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
fn handle_gossip(&mut self, gossip: GossipMessage<T>) {
if self.messages.contains_key(&gossip.message.id) {
self.eager_push_peers.remove(&gossip.sender);
self.lazy_push_peers.insert(gossip.sender.clone());
self.actions
.send(gossip.sender, PruneMessage::new(&self.id));
} else {
self.actions.deliver(gossip.message.clone());
self.eager_push(&gossip);
self.lazy_push(&gossip);
self.eager_push_peers.insert(gossip.sender.clone());
self.lazy_push_peers.remove(&gossip.sender);
self.optimize(&gossip);
self.missings.remove(&gossip.message.id);
self.messages
.insert(gossip.message.id, gossip.message.payload);
}
}
fn handle_ihave(&mut self, mut ihave: IhaveMessage<T>) {
if self.messages.contains_key(&ihave.message_id) {
return;
}
if self.eager_push_peers.is_empty() {
ihave.realtime = true;
}
self.missings
.push(ihave, &self.clock, self.options.ihave_timeout);
}
fn handle_graft(&mut self, mut graft: GraftMessage<T>) {
self.eager_push_peers.insert(graft.sender.clone());
self.lazy_push_peers.remove(&graft.sender);
if let Some(message_id) = graft.message_id.take() {
if let Some(payload) = self.messages.get(&message_id).cloned() {
let gossip =
GossipMessage::new(&self.id, Message::new(message_id, payload), graft.round);
self.actions.send(graft.sender, gossip);
}
}
}
fn handle_prune(&mut self, prune: PruneMessage<T>) {
self.eager_push_peers.remove(&prune.sender);
self.lazy_push_peers.insert(prune.sender);
}
fn eager_push(&mut self, gossip: &GossipMessage<T>) {
let round = gossip.round.saturating_add(1);
for peer in self.eager_push_peers
.iter()
.filter(|n| **n != gossip.sender)
{
let forward = GossipMessage::new(&self.id, gossip.message.clone(), round);
self.actions.send(peer.clone(), forward);
}
}
fn lazy_push(&mut self, gossip: &GossipMessage<T>) {
let round = gossip.round.saturating_add(1);
let ihave = IhaveMessage::new(&self.id, gossip.message.id.clone(), round, true);
for peer in self.lazy_push_peers.iter().filter(|n| **n != gossip.sender) {
self.actions.send(peer.clone(), ihave.clone());
}
}
fn optimize(&mut self, gossip: &GossipMessage<T>) {
if let Some((ihave_round, ihave_owner)) = self.missings.get_ihave(&gossip.message.id) {
let optimize =
gossip.round.checked_sub(ihave_round) >= Some(self.options.optimization_threshold);
if optimize {
let graft = GraftMessage::new(&self.id, None, ihave_round);
let prune = PruneMessage::new(&self.id);
self.actions.send(ihave_owner.clone(), graft);
self.actions.send(gossip.sender.clone(), prune);
}
}
}
fn is_known_node(&self, node_id: &T::NodeId) -> bool {
self.eager_push_peers.contains(node_id) || self.lazy_push_peers.contains(node_id)
}
}