#[cfg(not(test))]
use std::time::Instant;
use std::{
collections::{HashMap, HashSet},
fmt::{self, Display, Formatter},
hash::Hash,
time::Duration,
};
use datasize::DataSize;
#[cfg(test)]
use fake_instant::FakeClock as Instant;
use tracing::{error, trace, warn};
use casper_types::DisplayIter;
use super::Config;
use crate::{effect::GossipTarget, types::NodeId};
#[derive(Debug, PartialEq, Eq)]
pub(super) enum GossipAction {
GetRemainder { holder: NodeId },
AwaitingRemainder,
ShouldGossip(ShouldGossip),
Noop,
AnnounceFinished,
}
impl Display for GossipAction {
fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
match self {
GossipAction::GetRemainder { holder } => {
write!(formatter, "should get remainder from {}", holder)
}
GossipAction::AwaitingRemainder => write!(formatter, "awaiting remainder"),
GossipAction::ShouldGossip(should_gossip) => Display::fmt(should_gossip, formatter),
GossipAction::Noop => write!(formatter, "should do nothing"),
GossipAction::AnnounceFinished => write!(formatter, "finished gossiping"),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub(super) struct ShouldGossip {
pub(super) count: usize,
pub(super) exclude_peers: HashSet<NodeId>,
pub(super) is_already_held: bool,
pub(super) target: GossipTarget,
}
impl Display for ShouldGossip {
fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
write!(formatter, "should gossip to {} peer(s) ", self.count)?;
if !self.exclude_peers.is_empty() {
write!(
formatter,
"excluding {} ",
DisplayIter::new(&self.exclude_peers)
)?;
}
write!(
formatter,
"(we {} the item)",
if self.is_already_held {
"previously held"
} else {
"didn't previously hold"
}
)
}
}
#[derive(DataSize, Debug, Default)]
pub(super) struct State {
holders: HashSet<NodeId>,
infected_by_us: HashSet<NodeId>,
in_flight_count: usize,
target: Option<GossipTarget>,
attempted_to_infect: HashSet<NodeId>,
}
impl State {
fn held_by_us(&self) -> bool {
self.target.is_some()
}
fn is_finished(&self, infection_target: usize, attempted_to_infect_limit: usize) -> bool {
self.infected_by_us.len() >= infection_target
|| self.attempted_to_infect.len() >= attempted_to_infect_limit
}
fn action(
&mut self,
infection_target: usize,
attempted_to_infect_limit: usize,
is_new: bool,
) -> GossipAction {
if self.is_finished(infection_target, attempted_to_infect_limit) {
return GossipAction::Noop;
}
if let Some(target) = self.target {
let count =
infection_target.saturating_sub(self.in_flight_count + self.infected_by_us.len());
if count > 0 {
self.in_flight_count += count;
return GossipAction::ShouldGossip(ShouldGossip {
count,
target,
exclude_peers: self.attempted_to_infect.clone(),
is_already_held: !is_new,
});
}
return GossipAction::Noop;
}
if is_new {
let holder = *self
.holders
.iter()
.next()
.expect("holders cannot be empty if we don't hold the data");
GossipAction::GetRemainder { holder }
} else {
GossipAction::AwaitingRemainder
}
}
}
#[derive(DataSize, Debug)]
pub(super) struct Timeouts<T> {
values: Vec<(Instant, T)>,
}
impl<T> Timeouts<T> {
fn new() -> Self {
Timeouts { values: Vec::new() }
}
fn push(&mut self, timeout: Instant, data_id: T) {
self.values.push((timeout, data_id));
}
fn purge(&mut self, now: &Instant) -> impl Iterator<Item = T> + '_ {
let split_index = match self
.values
.binary_search_by(|(timeout, _data_id)| timeout.cmp(now))
{
Ok(index) => index,
Err(index) => index,
};
self.values
.drain(..split_index)
.map(|(_timeout, data_id)| data_id)
}
}
#[derive(DataSize, Debug)]
pub(super) struct GossipTable<T> {
current: HashMap<T, State>,
finished: HashSet<T>,
timeouts: Timeouts<T>,
infection_target: usize,
attempted_to_infect_limit: usize,
finished_entry_duration: Duration,
}
impl<T> GossipTable<T> {
pub(super) fn items_current(&self) -> usize {
self.current.len()
}
pub(super) fn items_finished(&self) -> usize {
self.finished.len()
}
}
impl<T: Clone + Eq + Hash + Display> GossipTable<T> {
pub(super) fn new(config: Config) -> Self {
let attempted_to_infect_limit = (100 * usize::from(config.infection_target()))
/ (100 - usize::from(config.saturation_limit_percent()));
GossipTable {
current: HashMap::new(),
finished: HashSet::new(),
timeouts: Timeouts::new(),
infection_target: usize::from(config.infection_target()),
attempted_to_infect_limit,
finished_entry_duration: config.finished_entry_duration().into(),
}
}
pub(super) fn new_data_id(&mut self, data_id: &T, holder: NodeId) -> GossipAction {
self.purge_finished();
if self.finished.contains(data_id) {
trace!(item=%data_id, "no further action: item already finished");
return GossipAction::Noop;
}
let update = |state: &mut State| {
let _ = state.holders.insert(holder);
};
if let Some(action) = self.update_current(data_id, update) {
trace!(item=%data_id, %action, "item is currently being gossiped");
return action;
}
let mut state = State::default();
update(&mut state);
let is_new = true;
let action = state.action(
self.infection_target,
self.attempted_to_infect_limit,
is_new,
);
let _ = self.current.insert(data_id.clone(), state);
trace!(item=%data_id, %action, "gossiping new item should begin");
action
}
pub(super) fn new_complete_data(
&mut self,
data_id: &T,
maybe_holder: Option<NodeId>,
target: GossipTarget,
) -> GossipAction {
self.purge_finished();
if self.finished.contains(data_id) {
trace!(item=%data_id, "no further action: item already finished");
return GossipAction::Noop;
}
let update = |state: &mut State| {
state.holders.extend(maybe_holder);
state.target = Some(target);
};
if let Some(action) = self.update_current(data_id, update) {
trace!(item=%data_id, %action, "item is currently being gossiped");
return action;
}
let mut state = State::default();
update(&mut state);
let is_new = true;
let action = state.action(
self.infection_target,
self.attempted_to_infect_limit,
is_new,
);
let _ = self.current.insert(data_id.clone(), state);
trace!(item=%data_id, %action, "gossiping new item should begin");
action
}
pub(super) fn register_infection_attempt<'a>(
&'a mut self,
item_id: &T,
peers: impl Iterator<Item = &'a NodeId>,
) {
if let Some(state) = self.current.get_mut(item_id) {
state.attempted_to_infect.extend(peers);
}
}
pub(super) fn we_infected(&mut self, data_id: &T, peer: NodeId) -> GossipAction {
let infected_by_us = true;
self.infected(data_id, peer, infected_by_us)
}
pub(super) fn already_infected(&mut self, data_id: &T, peer: NodeId) -> GossipAction {
let infected_by_us = false;
self.infected(data_id, peer, infected_by_us)
}
fn infected(&mut self, data_id: &T, peer: NodeId, by_us: bool) -> GossipAction {
let update = |state: &mut State| {
if !state.held_by_us() {
warn!(
item=%data_id,
%peer, "shouldn't have received a gossip response for data we don't hold"
);
return;
}
let _ = state.holders.insert(peer);
if by_us {
let _ = state.infected_by_us.insert(peer);
}
state.in_flight_count = state.in_flight_count.saturating_sub(1);
};
self.update_current(data_id, update)
.unwrap_or(GossipAction::Noop)
}
pub(super) fn reduce_in_flight_count(&mut self, data_id: &T, reduce_by: usize) -> bool {
let should_finish = if let Some(state) = self.current.get_mut(data_id) {
state.in_flight_count = state.in_flight_count.saturating_sub(reduce_by);
trace!(
item=%data_id,
in_flight_count=%state.in_flight_count,
"reduced in-flight count for item"
);
state.in_flight_count == 0
} else {
false
};
if should_finish {
trace!(item=%data_id, "finished gossiping since no more peers to gossip to");
return self.force_finish(data_id);
}
false
}
pub(super) fn check_timeout(&mut self, data_id: &T, peer: NodeId) -> GossipAction {
let update = |state: &mut State| {
debug_assert!(
state.held_by_us(),
"shouldn't check timeout for a gossip response for data we don't hold"
);
if !state.held_by_us() {
error!(
item=%data_id,
%peer, "shouldn't check timeout for a gossip response for data we don't hold"
);
return;
}
if !state.holders.contains(&peer) {
let _ = state.holders.insert(peer);
state.in_flight_count = state.in_flight_count.saturating_sub(1);
}
};
self.update_current(data_id, update)
.unwrap_or(GossipAction::Noop)
}
pub(super) fn remove_holder_if_unresponsive(
&mut self,
data_id: &T,
peer: NodeId,
) -> GossipAction {
if let Some(mut state) = self.current.remove(data_id) {
if !state.held_by_us() {
let _ = state.holders.remove(&peer);
trace!(item=%data_id, %peer, "removed peer as a holder of the item");
if state.holders.is_empty() {
trace!(item=%data_id, "no further action: item now removed as no holders");
return GossipAction::Noop;
}
}
let is_new = !state.held_by_us();
let action = state.action(
self.infection_target,
self.attempted_to_infect_limit,
is_new,
);
let _ = self.current.insert(data_id.clone(), state);
trace!(item=%data_id, %action, "assuming peer response did not timeout");
return action;
}
GossipAction::Noop
}
pub(super) fn force_finish(&mut self, data_id: &T) -> bool {
if self.current.remove(data_id).is_some() {
self.insert_to_finished(data_id);
return true;
}
false
}
pub(super) fn finish_if_not_held_by_us(&mut self, data_id: &T) -> bool {
if self
.current
.get(data_id)
.map(|state| !state.held_by_us())
.unwrap_or(false)
{
return self.force_finish(data_id);
}
false
}
pub(super) fn has_entry(&self, data_id: &T) -> bool {
self.current.contains_key(data_id) || self.finished.contains(data_id)
}
fn update_current<F: Fn(&mut State)>(
&mut self,
data_id: &T,
update: F,
) -> Option<GossipAction> {
let mut state = self.current.remove(data_id)?;
update(&mut state);
if state.is_finished(self.infection_target, self.attempted_to_infect_limit) {
self.insert_to_finished(data_id);
return Some(GossipAction::AnnounceFinished);
}
let is_new = false;
let action = state.action(
self.infection_target,
self.attempted_to_infect_limit,
is_new,
);
let _ = self.current.insert(data_id.clone(), state);
Some(action)
}
fn insert_to_finished(&mut self, data_id: &T) {
let timeout = Instant::now() + self.finished_entry_duration;
let _ = self.finished.insert(data_id.clone());
self.timeouts.push(timeout, data_id.clone());
}
fn purge_finished(&mut self) {
let now = Instant::now();
for expired_finished in self.timeouts.purge(&now) {
let _ = self.finished.remove(&expired_finished);
}
}
#[cfg(test)]
pub(super) fn is_empty(&self) -> bool {
self.current.is_empty() && self.finished.is_empty()
}
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeSet, iter, str::FromStr};
use rand::Rng;
use casper_types::{testing::TestRng, DisplayIter, TimeDiff};
use super::{super::config::DEFAULT_FINISHED_ENTRY_DURATION, *};
use crate::logging;
const EXPECTED_DEFAULT_INFECTION_TARGET: usize = 3;
const EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT: usize = 15;
fn random_node_ids(rng: &mut TestRng) -> Vec<NodeId> {
iter::repeat_with(|| NodeId::random(rng))
.take(EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT + 3)
.collect()
}
fn check_holders(expected: &[NodeId], gossip_table: &GossipTable<u64>, data_id: &u64) {
let expected: BTreeSet<_> = expected.iter().collect();
let actual: BTreeSet<_> = gossip_table
.current
.get(data_id)
.map_or_else(BTreeSet::new, |state| state.holders.iter().collect());
assert!(
expected == actual,
"\nexpected: {}\nactual: {}\n",
DisplayIter::new(expected.iter()),
DisplayIter::new(actual.iter())
);
}
#[test]
fn new_data_id() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
assert_eq!(
EXPECTED_DEFAULT_INFECTION_TARGET,
gossip_table.infection_target
);
assert_eq!(
EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT,
gossip_table.attempted_to_infect_limit
);
let action = gossip_table.new_data_id(&data_id, node_ids[0]);
let expected = GossipAction::GetRemainder {
holder: node_ids[0],
};
assert_eq!(expected, action);
check_holders(&node_ids[..1], &gossip_table, &data_id);
let action = gossip_table.new_data_id(&data_id, node_ids[0]);
assert_eq!(GossipAction::AwaitingRemainder, action);
check_holders(&node_ids[..1], &gossip_table, &data_id);
let action = gossip_table.new_data_id(&data_id, node_ids[1]);
assert_eq!(GossipAction::AwaitingRemainder, action);
check_holders(&node_ids[..2], &gossip_table, &data_id);
let _ = gossip_table.new_complete_data(&data_id, Some(node_ids[0]), GossipTarget::All);
let limit = 3 + EXPECTED_DEFAULT_INFECTION_TARGET;
for node_id in &node_ids[3..limit] {
let _ = gossip_table.we_infected(&data_id, *node_id);
}
let action = gossip_table.new_data_id(&data_id, node_ids[limit]);
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[..0], &gossip_table, &data_id);
let millis = TimeDiff::from_str(DEFAULT_FINISHED_ENTRY_DURATION)
.unwrap()
.millis();
Instant::advance_time(millis + 1);
let action = gossip_table.new_data_id(&data_id, node_ids[0]);
let expected = GossipAction::GetRemainder {
holder: node_ids[0],
};
assert_eq!(expected, action);
check_holders(&node_ids[..1], &gossip_table, &data_id);
}
#[test]
fn should_noop_if_we_dont_hold_data_and_get_gossip_response() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_id = NodeId::random(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_data_id(&data_id, node_id);
let action = gossip_table.we_infected(&data_id, node_id);
assert_eq!(GossipAction::AwaitingRemainder, action);
let action = gossip_table.already_infected(&data_id, node_id);
assert_eq!(GossipAction::AwaitingRemainder, action);
}
#[test]
fn new_complete_data() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let action = gossip_table.new_complete_data(&data_id, None, GossipTarget::All);
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: EXPECTED_DEFAULT_INFECTION_TARGET,
target: GossipTarget::All,
exclude_peers: HashSet::new(),
is_already_held: false,
});
assert_eq!(expected, action);
check_holders(&node_ids[..0], &gossip_table, &data_id);
gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[0]));
let action = gossip_table.new_complete_data(&data_id, Some(node_ids[0]), GossipTarget::All);
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[..1], &gossip_table, &data_id);
gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[1]));
let action = gossip_table.already_infected(&data_id, node_ids[1]);
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
target: GossipTarget::All,
exclude_peers: node_ids[..2].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
check_holders(&node_ids[..2], &gossip_table, &data_id);
gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[2]));
let action = gossip_table.new_complete_data(&data_id, Some(node_ids[2]), GossipTarget::All);
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[..3], &gossip_table, &data_id);
let limit = 3 + EXPECTED_DEFAULT_INFECTION_TARGET;
for node_id in &node_ids[3..limit] {
gossip_table.register_infection_attempt(&data_id, iter::once(node_id));
let _ = gossip_table.we_infected(&data_id, *node_id);
}
let action = gossip_table.new_complete_data(&data_id, None, GossipTarget::All);
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[..0], &gossip_table, &data_id);
let millis = TimeDiff::from_str(DEFAULT_FINISHED_ENTRY_DURATION)
.unwrap()
.millis();
Instant::advance_time(millis + 1);
let action = gossip_table.new_complete_data(&data_id, Some(node_ids[0]), GossipTarget::All);
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: EXPECTED_DEFAULT_INFECTION_TARGET,
target: GossipTarget::All,
exclude_peers: HashSet::new(), is_already_held: false,
});
assert_eq!(expected, action);
check_holders(&node_ids[..1], &gossip_table, &data_id);
}
#[test]
fn should_terminate_via_infection_limit() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All);
let limit = EXPECTED_DEFAULT_INFECTION_TARGET - 1;
for node_id in node_ids.iter().take(limit) {
gossip_table.register_infection_attempt(&data_id, iter::once(node_id));
let action = gossip_table.we_infected(&data_id, *node_id);
assert_eq!(GossipAction::Noop, action);
assert!(!gossip_table.finished.contains(&data_id));
}
gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[limit - 1]));
let action = gossip_table.we_infected(&data_id, node_ids[limit - 1]);
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
target: GossipTarget::All,
exclude_peers: node_ids[..limit].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
assert!(!gossip_table.finished.contains(&data_id));
gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[limit]));
let action = gossip_table.we_infected(&data_id, node_ids[limit]);
assert_eq!(GossipAction::AnnounceFinished, action);
assert!(gossip_table.finished.contains(&data_id));
}
#[test]
fn should_not_terminate_via_incoming_gossip() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id1: u64 = rng.gen();
let data_id2: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let limit = EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT - 1;
for node_id in node_ids.iter().take(limit) {
let _ = gossip_table.new_data_id(&data_id1, *node_id);
assert!(!gossip_table.finished.contains(&data_id1));
let _ = gossip_table.new_complete_data(&data_id2, Some(*node_id), GossipTarget::All);
assert!(!gossip_table.finished.contains(&data_id2));
}
let action = gossip_table.new_data_id(
&data_id1,
node_ids[EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT],
);
assert!(!gossip_table.finished.contains(&data_id1));
assert_eq!(GossipAction::AwaitingRemainder, action);
let action = gossip_table.new_complete_data(
&data_id2,
Some(node_ids[EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT]),
GossipTarget::All,
);
assert!(!gossip_table.finished.contains(&data_id2));
assert_eq!(GossipAction::Noop, action);
}
#[test]
fn should_terminate_via_checking_timeout() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let limit = EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT - 1;
for node_id in node_ids.iter().take(limit) {
let _ = gossip_table.new_complete_data(&data_id, Some(*node_id), GossipTarget::All);
gossip_table.register_infection_attempt(&data_id, iter::once(node_id));
assert!(!gossip_table.finished.contains(&data_id));
}
gossip_table.register_infection_attempt(
&data_id,
iter::once(&node_ids[EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT]),
);
let action = gossip_table.check_timeout(
&data_id,
node_ids[EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT],
);
assert!(gossip_table.finished.contains(&data_id));
assert_eq!(GossipAction::AnnounceFinished, action);
}
#[test]
fn should_terminate_via_reducing_in_flight_count() {
let _ = logging::init();
let mut rng = crate::new_rng();
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All);
let limit = EXPECTED_DEFAULT_INFECTION_TARGET - 1;
assert!(!gossip_table.reduce_in_flight_count(&data_id, limit));
assert!(!gossip_table.finished.contains(&data_id));
assert!(gossip_table.reduce_in_flight_count(&data_id, 1));
assert!(gossip_table.finished.contains(&data_id));
assert!(!gossip_table.reduce_in_flight_count(&data_id, 1));
assert!(gossip_table.finished.contains(&data_id));
}
#[test]
fn should_terminate_via_saturation() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All);
let limit = EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT - 1;
for (index, node_id) in node_ids.iter().enumerate().take(limit) {
gossip_table.register_infection_attempt(&data_id, iter::once(node_id));
let action = gossip_table.already_infected(&data_id, *node_id);
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
target: GossipTarget::All,
exclude_peers: node_ids[..(index + 1)].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
}
gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[0]));
let action = gossip_table.already_infected(&data_id, node_ids[0]);
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
target: GossipTarget::All,
exclude_peers: node_ids[..limit].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[limit]));
let action = gossip_table.we_infected(&data_id, node_ids[limit]);
assert_eq!(GossipAction::AnnounceFinished, action);
}
#[test]
fn should_not_terminate_below_infection_limit_and_saturation() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All);
let infection_limit = EXPECTED_DEFAULT_INFECTION_TARGET - 1;
for node_id in &node_ids[0..infection_limit] {
gossip_table.register_infection_attempt(&data_id, iter::once(node_id));
let _ = gossip_table.we_infected(&data_id, *node_id);
}
let attempted_to_infect = EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT - 2;
for node_id in &node_ids[infection_limit..attempted_to_infect] {
gossip_table.register_infection_attempt(&data_id, iter::once(node_id));
let _ = gossip_table.already_infected(&data_id, *node_id);
}
gossip_table
.register_infection_attempt(&data_id, iter::once(&node_ids[attempted_to_infect]));
let action = gossip_table.already_infected(&data_id, node_ids[attempted_to_infect]);
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
target: GossipTarget::All,
exclude_peers: node_ids[..(attempted_to_infect + 1)]
.iter()
.cloned()
.collect(),
is_already_held: true,
});
assert_eq!(expected, action);
}
#[test]
fn check_timeout_should_detect_holder() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All);
let _ = gossip_table.we_infected(&data_id, node_ids[0]);
gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[0]));
let action = gossip_table.check_timeout(&data_id, node_ids[0]);
assert_eq!(GossipAction::Noop, action);
gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[1]));
let action = gossip_table.check_timeout(&data_id, node_ids[1]);
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
target: GossipTarget::All,
exclude_peers: node_ids[..=1].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
}
#[test]
#[cfg_attr(
debug_assertions,
should_panic(
expected = "shouldn't check timeout for a gossip response for data we don't hold"
)
)]
fn check_timeout_should_panic_for_data_we_dont_hold() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_data_id(&data_id, node_ids[0]);
let _ = gossip_table.check_timeout(&data_id, node_ids[0]);
}
#[test]
fn should_remove_holder_if_unresponsive() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_data_id(&data_id, node_ids[0]);
let _ = gossip_table.new_data_id(&data_id, node_ids[1]);
let action = gossip_table.remove_holder_if_unresponsive(&data_id, node_ids[0]);
let expected = GossipAction::GetRemainder {
holder: node_ids[1],
};
assert_eq!(expected, action);
check_holders(&node_ids[1..2], &gossip_table, &data_id);
let action = gossip_table.remove_holder_if_unresponsive(&data_id, node_ids[1]);
assert_eq!(GossipAction::Noop, action);
assert!(!gossip_table.current.contains_key(&data_id));
assert!(!gossip_table.finished.contains(&data_id));
}
#[test]
fn should_not_remove_holder_if_responsive() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_data_id(&data_id, node_ids[0]);
let _ = gossip_table.new_complete_data(&data_id, Some(node_ids[0]), GossipTarget::All);
let action = gossip_table.remove_holder_if_unresponsive(&data_id, node_ids[0]);
assert_eq!(GossipAction::Noop, action); check_holders(&node_ids[..1], &gossip_table, &data_id);
assert!(gossip_table.current.contains_key(&data_id));
}
#[test]
fn should_force_finish() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_data_id(&data_id, node_ids[0]);
assert!(gossip_table.force_finish(&data_id));
assert!(gossip_table.finished.contains(&data_id));
assert!(!gossip_table.force_finish(&data_id));
}
#[test]
fn should_purge() {
let _ = logging::init();
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All);
for node_id in &node_ids[0..EXPECTED_DEFAULT_INFECTION_TARGET] {
let _ = gossip_table.we_infected(&data_id, *node_id);
}
assert!(gossip_table.finished.contains(&data_id));
let millis = TimeDiff::from_str(DEFAULT_FINISHED_ENTRY_DURATION)
.unwrap()
.millis();
Instant::advance_time(millis + 1);
gossip_table.purge_finished();
assert!(!gossip_table.finished.contains(&data_id));
let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All);
assert!(gossip_table.force_finish(&data_id));
assert!(gossip_table.finished.contains(&data_id));
Instant::advance_time(millis + 1);
gossip_table.purge_finished();
assert!(!gossip_table.finished.contains(&data_id));
}
#[test]
fn timeouts_purge_in_order() {
let mut timeouts = Timeouts::new();
let now = Instant::now();
let later_100 = now + Duration::from_millis(100);
let later_200 = now + Duration::from_millis(200);
timeouts.push(now, 0);
timeouts.push(later_100, 1);
timeouts.push(later_200, 2);
let now_after_time_travel = now + Duration::from_millis(10);
let purged = timeouts.purge(&now_after_time_travel).collect::<Vec<i32>>();
assert_eq!(purged, vec![0]);
}
#[test]
fn timeouts_depends_on_binary_search_by_implementation() {
let mut timeouts = Timeouts::new();
let now = Instant::now();
let later_100 = now + Duration::from_millis(100);
let later_200 = now + Duration::from_millis(200);
let later_300 = now + Duration::from_millis(300);
let later_400 = now + Duration::from_millis(400);
let later_500 = now + Duration::from_millis(500);
let later_600 = now + Duration::from_millis(600);
timeouts.push(later_100, 1);
timeouts.push(later_200, 2);
timeouts.push(later_300, 3);
timeouts.push(now, 0);
let now_after_time_travel = now + Duration::from_millis(10);
let purged = timeouts.purge(&now_after_time_travel).collect::<Vec<i32>>();
let empty: Vec<i32> = vec![];
assert_eq!(purged, empty);
timeouts.push(later_400, 4);
timeouts.push(later_500, 5);
timeouts.push(later_600, 6);
let now_after_time_travel = now + Duration::from_millis(20);
let purged = timeouts.purge(&now_after_time_travel).collect::<Vec<i32>>();
let expected = [1, 2, 3, 0];
assert_eq!(purged, expected);
let now_after_time_travel = now + Duration::from_millis(610);
let purged = timeouts.purge(&now_after_time_travel).collect::<Vec<i32>>();
let expected = [4, 5, 6];
assert_eq!(purged, expected);
assert_eq!(0, timeouts.values.len());
}
}