#[cfg(not(test))]
use std::time::Instant;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
fmt::Display,
hash::Hash,
time::Duration,
};
use datasize::DataSize;
#[cfg(test)]
use fake_instant::FakeClock as Instant;
use tracing::warn;
use super::Config;
#[cfg(test)]
use super::Error;
use crate::types::NodeId;
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum GossipAction {
GetRemainder { holder: NodeId },
AwaitingRemainder,
ShouldGossip(ShouldGossip),
Noop,
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct ShouldGossip {
pub(crate) count: usize,
pub(crate) exclude_peers: HashSet<NodeId>,
pub(crate) is_already_held: bool,
}
#[derive(DataSize, Debug, Default)]
pub(crate) struct State {
holders: HashSet<NodeId>,
held_by_us: bool,
infected_by_us: HashSet<NodeId>,
in_flight_count: usize,
}
impl State {
fn is_finished(&self, infection_target: usize, holders_limit: usize) -> bool {
self.infected_by_us.len() >= infection_target || self.holders.len() >= holders_limit
}
fn action(
&mut self,
infection_target: usize,
holders_limit: usize,
is_new: bool,
) -> GossipAction {
if self.is_finished(infection_target, holders_limit) {
return GossipAction::Noop;
}
if self.held_by_us {
let count =
infection_target.saturating_sub(self.in_flight_count + self.infected_by_us.len());
if count > 0 {
self.in_flight_count += count;
return GossipAction::ShouldGossip(ShouldGossip {
count,
exclude_peers: self.holders.clone(),
is_already_held: !is_new,
});
} else {
return GossipAction::Noop;
}
}
if is_new {
let holder = self
.holders
.iter()
.next()
.expect("holders cannot be empty if we don't hold the data")
.clone();
GossipAction::GetRemainder { holder }
} else {
GossipAction::AwaitingRemainder
}
}
}
#[derive(Debug)]
struct Timeouts<T> {
values: Vec<(Instant, T)>,
}
impl<T> Timeouts<T> {
fn new() -> Self {
Timeouts { values: Vec::new() }
}
fn push(&mut self, timeout: Instant, data_id: T) {
self.values.push((timeout, data_id));
}
fn purge(&mut self, now: &Instant) -> impl Iterator<Item = T> + '_ {
let split_index = match self
.values
.binary_search_by(|(timeout, _data_id)| timeout.cmp(now))
{
Ok(index) => index,
Err(index) => index,
};
self.values
.drain(..split_index)
.map(|(_timeout, data_id)| data_id)
}
}
#[derive(DataSize, Debug)]
pub(crate) struct GossipTable<T> {
current: HashMap<T, State>,
finished: HashSet<T>,
#[data_size(skip)]
finished_timeouts: Timeouts<T>,
paused: HashMap<T, State>,
#[data_size(skip)]
paused_timeouts: Timeouts<T>,
infection_target: usize,
holders_limit: usize,
finished_entry_duration: Duration,
}
impl<T> GossipTable<T> {
pub fn items_current(&self) -> usize {
self.current.len()
}
pub fn items_finished(&self) -> usize {
self.finished.len()
}
pub fn items_paused(&self) -> usize {
self.paused.len()
}
}
impl<T: Copy + Eq + Hash + Display> GossipTable<T> {
pub(crate) fn new(config: Config) -> Self {
let holders_limit = (100 * usize::from(config.infection_target()))
/ (100 - usize::from(config.saturation_limit_percent()));
GossipTable {
current: HashMap::new(),
finished: HashSet::new(),
finished_timeouts: Timeouts::new(),
paused: HashMap::new(),
paused_timeouts: Timeouts::new(),
infection_target: usize::from(config.infection_target()),
holders_limit,
finished_entry_duration: Duration::from_secs(config.finished_entry_duration_secs()),
}
}
pub(crate) fn new_partial_data(&mut self, data_id: &T, holder: NodeId) -> GossipAction {
self.purge_finished();
if self.finished.contains(data_id) {
return GossipAction::Noop;
}
if let Some(state) = self.paused.get_mut(data_id) {
let _ = state.holders.insert(holder);
return GossipAction::Noop;
}
match self.current.entry(*data_id) {
Entry::Occupied(mut entry) => {
let is_new = false;
let state = entry.get_mut();
let _ = state.holders.insert(holder);
state.action(self.infection_target, self.holders_limit, is_new)
}
Entry::Vacant(entry) => {
let is_new = true;
let state = entry.insert(State::default());
let _ = state.holders.insert(holder);
state.action(self.infection_target, self.holders_limit, is_new)
}
}
}
pub(crate) fn new_complete_data(
&mut self,
data_id: &T,
maybe_holder: Option<NodeId>,
) -> Option<ShouldGossip> {
self.purge_finished();
if self.finished.contains(data_id) {
return None;
}
let update = |state: &mut State| {
state.holders.extend(maybe_holder);
state.held_by_us = true;
};
if let Some(state) = self.paused.get_mut(data_id) {
update(state);
return None;
}
let action = match self.current.entry(*data_id) {
Entry::Occupied(mut entry) => {
let state = entry.get_mut();
update(state);
let is_new = false;
state.action(self.infection_target, self.holders_limit, is_new)
}
Entry::Vacant(entry) => {
let state = entry.insert(State::default());
update(state);
let is_new = true;
state.action(self.infection_target, self.holders_limit, is_new)
}
};
match action {
GossipAction::ShouldGossip(should_gossip) => Some(should_gossip),
GossipAction::Noop => None,
GossipAction::GetRemainder { .. } | GossipAction::AwaitingRemainder => {
unreachable!("can't be waiting for remainder since we hold the complete data")
}
}
}
pub(crate) fn we_infected(&mut self, data_id: &T, peer: NodeId) -> GossipAction {
let infected_by_us = true;
self.infected(data_id, peer, infected_by_us)
}
pub(crate) fn already_infected(&mut self, data_id: &T, peer: NodeId) -> GossipAction {
let infected_by_us = false;
self.infected(data_id, peer, infected_by_us)
}
fn infected(&mut self, data_id: &T, peer: NodeId, by_us: bool) -> GossipAction {
let infection_target = self.infection_target;
let holders_limit = self.holders_limit;
let update = |state: &mut State| {
if !state.held_by_us {
warn!(
%data_id,
%peer, "shouldn't have received a gossip response for partial data"
);
return None;
}
let _ = state.holders.insert(peer.clone());
if by_us {
let _ = state.infected_by_us.insert(peer.clone());
}
state.in_flight_count = state.in_flight_count.saturating_sub(1);
Some(state.is_finished(infection_target, holders_limit))
};
let is_finished = if let Some(state) = self.current.get_mut(data_id) {
let is_finished = match update(state) {
Some(is_finished) => is_finished,
None => return GossipAction::Noop,
};
if !is_finished {
let is_new = false;
return state.action(self.infection_target, self.holders_limit, is_new);
}
true
} else {
false
};
if is_finished {
let _ = self.current.remove(data_id);
let timeout = Instant::now() + self.finished_entry_duration;
let _ = self.finished.insert(*data_id);
let _ = self.finished_timeouts.push(timeout, *data_id);
return GossipAction::Noop;
}
let is_finished = if let Some(state) = self.paused.get_mut(data_id) {
match update(state) {
Some(is_finished) => is_finished,
None => return GossipAction::Noop,
}
} else {
false
};
if is_finished {
let _ = self.paused.remove(data_id);
let timeout = Instant::now() + self.finished_entry_duration;
let _ = self.finished.insert(*data_id);
let _ = self.finished_timeouts.push(timeout, *data_id);
}
GossipAction::Noop
}
pub(crate) fn reduce_in_flight_count(&mut self, data_id: &T, reduce_by: usize) {
if let Some(state) = self.current.get_mut(data_id) {
state.in_flight_count = state.in_flight_count.saturating_sub(reduce_by);
}
}
pub(crate) fn check_timeout(&mut self, data_id: &T, peer: NodeId) -> GossipAction {
if let Some(state) = self.current.get_mut(data_id) {
debug_assert!(
state.held_by_us,
"shouldn't check timeout for a gossip response for partial data"
);
if !state.holders.contains(&peer) {
let _ = state.holders.insert(peer);
state.in_flight_count = state.in_flight_count.saturating_sub(1);
let is_new = false;
return state.action(self.infection_target, self.holders_limit, is_new);
}
}
GossipAction::Noop
}
pub(crate) fn remove_holder_if_unresponsive(
&mut self,
data_id: &T,
peer: NodeId,
) -> GossipAction {
if let Some(mut state) = self.current.remove(data_id) {
if !state.held_by_us {
let _ = state.holders.remove(&peer);
if state.holders.is_empty() {
return GossipAction::Noop;
}
}
let is_new = !state.held_by_us;
let action = state.action(self.infection_target, self.holders_limit, is_new);
let _ = self.current.insert(*data_id, state);
return action;
}
if let Some(state) = self.paused.get_mut(data_id) {
if !state.held_by_us {
let _ = state.holders.remove(&peer);
}
}
GossipAction::Noop
}
pub(crate) fn pause(&mut self, data_id: &T) {
if let Some(mut state) = self.current.remove(data_id) {
state.in_flight_count = 0;
let timeout = Instant::now() + self.finished_entry_duration;
let _ = self.paused.insert(*data_id, state);
let _ = self.paused_timeouts.push(timeout, *data_id);
}
}
#[cfg(test)]
pub(crate) fn resume(&mut self, data_id: &T) -> Result<GossipAction, Error> {
let mut state = self.paused.remove(data_id).ok_or(Error::NotPaused)?;
let is_new = !state.held_by_us;
let action = state.action(self.infection_target, self.holders_limit, is_new);
let _ = self.current.insert(*data_id, state);
Ok(action)
}
fn purge_finished(&mut self) {
let now = Instant::now();
for expired_finished in self.finished_timeouts.purge(&now) {
let _ = self.finished.remove(&expired_finished);
}
for expired_paused in self.paused_timeouts.purge(&now) {
let _ = self.paused.remove(&expired_paused);
}
}
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeSet, iter};
use rand::Rng;
use test::Bencher;
use super::{super::config::DEFAULT_FINISHED_ENTRY_DURATION_SECS, *};
use crate::{crypto::hash::Digest, testing::TestRng, types::DeployHash, utils::DisplayIter};
const EXPECTED_DEFAULT_INFECTION_TARGET: usize = 3;
const EXPECTED_DEFAULT_HOLDERS_LIMIT: usize = 15;
fn random_node_ids(rng: &mut TestRng) -> Vec<NodeId> {
iter::repeat_with(|| NodeId::random(rng))
.take(EXPECTED_DEFAULT_HOLDERS_LIMIT + 3)
.collect()
}
fn check_holders(expected: &[NodeId], gossip_table: &GossipTable<u64>, data_id: &u64) {
let expected: BTreeSet<_> = expected.iter().collect();
let actual: BTreeSet<_> = gossip_table
.current
.get(data_id)
.or_else(|| gossip_table.paused.get(data_id))
.map_or_else(BTreeSet::new, |state| state.holders.iter().collect());
assert!(
expected == actual,
"\nexpected: {}\nactual: {}\n",
DisplayIter::new(expected.iter()),
DisplayIter::new(actual.iter())
);
}
#[test]
fn new_partial_data() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
assert_eq!(
EXPECTED_DEFAULT_INFECTION_TARGET,
gossip_table.infection_target
);
assert_eq!(EXPECTED_DEFAULT_HOLDERS_LIMIT, gossip_table.holders_limit);
let action = gossip_table.new_partial_data(&data_id, node_ids[0].clone());
let expected = GossipAction::GetRemainder {
holder: node_ids[0].clone(),
};
assert_eq!(expected, action);
check_holders(&node_ids[..1], &gossip_table, &data_id);
let action = gossip_table.new_partial_data(&data_id, node_ids[0].clone());
assert_eq!(GossipAction::AwaitingRemainder, action);
check_holders(&node_ids[..1], &gossip_table, &data_id);
let action = gossip_table.new_partial_data(&data_id, node_ids[1].clone());
assert_eq!(GossipAction::AwaitingRemainder, action);
check_holders(&node_ids[..2], &gossip_table, &data_id);
gossip_table.pause(&data_id);
let action = gossip_table.new_partial_data(&data_id, node_ids[2].clone());
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[..3], &gossip_table, &data_id);
gossip_table.resume(&data_id).unwrap();
let action = gossip_table.new_partial_data(&data_id, node_ids[3].clone());
assert_eq!(GossipAction::AwaitingRemainder, action);
check_holders(&node_ids[..4], &gossip_table, &data_id);
let _ = gossip_table.new_complete_data(&data_id, Some(node_ids[0].clone()));
let limit = 4 + EXPECTED_DEFAULT_INFECTION_TARGET;
for node_id in &node_ids[4..limit] {
let _ = gossip_table.we_infected(&data_id, node_id.clone());
}
let action = gossip_table.new_partial_data(&data_id, node_ids[limit].clone());
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[..0], &gossip_table, &data_id);
Instant::advance_time(DEFAULT_FINISHED_ENTRY_DURATION_SECS * 1_000 + 1);
let action = gossip_table.new_partial_data(&data_id, node_ids[0].clone());
let expected = GossipAction::GetRemainder {
holder: node_ids[0].clone(),
};
assert_eq!(expected, action);
check_holders(&node_ids[..1], &gossip_table, &data_id);
}
#[test]
fn should_noop_if_we_have_partial_data_and_get_gossip_response() {
let mut rng = crate::new_rng();
let node_id = NodeId::random(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_partial_data(&data_id, node_id.clone());
let action = gossip_table.we_infected(&data_id, node_id.clone());
assert_eq!(GossipAction::Noop, action);
let action = gossip_table.already_infected(&data_id, node_id);
assert_eq!(GossipAction::Noop, action);
}
#[test]
fn new_complete_data() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let action = gossip_table.new_complete_data(&data_id, None);
let expected = Some(ShouldGossip {
count: EXPECTED_DEFAULT_INFECTION_TARGET,
exclude_peers: HashSet::new(),
is_already_held: false,
});
assert_eq!(expected, action);
check_holders(&node_ids[..0], &gossip_table, &data_id);
let action = gossip_table.new_complete_data(&data_id, Some(node_ids[0].clone()));
assert!(action.is_none());
check_holders(&node_ids[..1], &gossip_table, &data_id);
let action = gossip_table.already_infected(&data_id, node_ids[1].clone());
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
exclude_peers: node_ids[..2].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
check_holders(&node_ids[..2], &gossip_table, &data_id);
gossip_table.pause(&data_id);
let action = gossip_table.new_complete_data(&data_id, Some(node_ids[2].clone()));
assert!(action.is_none());
check_holders(&node_ids[..3], &gossip_table, &data_id);
let action = gossip_table.resume(&data_id).unwrap();
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: EXPECTED_DEFAULT_INFECTION_TARGET,
exclude_peers: node_ids[..3].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
let action = gossip_table.new_complete_data(&data_id, Some(node_ids[3].clone()));
assert!(action.is_none());
check_holders(&node_ids[..4], &gossip_table, &data_id);
let limit = 4 + EXPECTED_DEFAULT_INFECTION_TARGET;
for node_id in &node_ids[4..limit] {
let _ = gossip_table.we_infected(&data_id, node_id.clone());
}
let action = gossip_table.new_complete_data(&data_id, None);
assert!(action.is_none());
check_holders(&node_ids[..0], &gossip_table, &data_id);
Instant::advance_time(DEFAULT_FINISHED_ENTRY_DURATION_SECS * 1_000 + 1);
let action = gossip_table.new_complete_data(&data_id, Some(node_ids[0].clone()));
let expected = Some(ShouldGossip {
count: EXPECTED_DEFAULT_INFECTION_TARGET,
exclude_peers: node_ids[..1].iter().cloned().collect(),
is_already_held: false,
});
assert_eq!(expected, action);
check_holders(&node_ids[..1], &gossip_table, &data_id);
}
#[test]
fn should_terminate_via_infection_limit() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None);
let limit = EXPECTED_DEFAULT_INFECTION_TARGET - 1;
for node_id in node_ids.iter().take(limit) {
let action = gossip_table.we_infected(&data_id, node_id.clone());
assert_eq!(GossipAction::Noop, action);
assert!(!gossip_table.finished.contains(&data_id));
}
let action = gossip_table.we_infected(&data_id, node_ids[limit - 1].clone());
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
exclude_peers: node_ids[..limit].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
assert!(!gossip_table.finished.contains(&data_id));
let action = gossip_table.we_infected(&data_id, node_ids[limit].clone());
assert_eq!(GossipAction::Noop, action);
assert!(gossip_table.finished.contains(&data_id));
}
#[test]
fn should_terminate_via_saturation() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None);
let limit = EXPECTED_DEFAULT_HOLDERS_LIMIT - 1;
for (index, node_id) in node_ids.iter().enumerate().take(limit) {
let action = gossip_table.already_infected(&data_id, node_id.clone());
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
exclude_peers: node_ids[..(index + 1)].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
}
let action = gossip_table.already_infected(&data_id, node_ids[0].clone());
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
exclude_peers: node_ids[..limit].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
let action = gossip_table.we_infected(&data_id, node_ids[limit].clone());
assert_eq!(GossipAction::Noop, action);
}
#[test]
fn should_not_terminate_below_infection_limit_and_saturation() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None);
let infection_limit = EXPECTED_DEFAULT_INFECTION_TARGET - 1;
for node_id in &node_ids[0..infection_limit] {
let _ = gossip_table.we_infected(&data_id, node_id.clone());
}
let holders_limit = EXPECTED_DEFAULT_HOLDERS_LIMIT - 2;
for node_id in &node_ids[infection_limit..holders_limit] {
let _ = gossip_table.already_infected(&data_id, node_id.clone());
}
let action = gossip_table.already_infected(&data_id, node_ids[holders_limit].clone());
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
exclude_peers: node_ids[..(holders_limit + 1)].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
}
#[test]
fn check_timeout_should_detect_holder() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None);
let _ = gossip_table.we_infected(&data_id, node_ids[0].clone());
let action = gossip_table.check_timeout(&data_id, node_ids[0].clone());
assert_eq!(GossipAction::Noop, action);
let action = gossip_table.check_timeout(&data_id, node_ids[1].clone());
let expected = GossipAction::ShouldGossip(ShouldGossip {
count: 1,
exclude_peers: node_ids[..=1].iter().cloned().collect(),
is_already_held: true,
});
assert_eq!(expected, action);
}
#[test]
#[cfg_attr(
debug_assertions,
should_panic(expected = "shouldn't check timeout for a gossip response for partial data")
)]
fn check_timeout_should_panic_for_partial_copy() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_partial_data(&data_id, node_ids[0].clone());
let _ = gossip_table.check_timeout(&data_id, node_ids[0].clone());
}
#[test]
fn should_remove_holder_if_unresponsive() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_partial_data(&data_id, node_ids[0].clone());
let _ = gossip_table.new_partial_data(&data_id, node_ids[1].clone());
let action = gossip_table.remove_holder_if_unresponsive(&data_id, node_ids[0].clone());
let expected = GossipAction::GetRemainder {
holder: node_ids[1].clone(),
};
assert_eq!(expected, action);
check_holders(&node_ids[1..2], &gossip_table, &data_id);
let action = gossip_table.remove_holder_if_unresponsive(&data_id, node_ids[1].clone());
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[..0], &gossip_table, &data_id);
assert!(!gossip_table.current.contains_key(&data_id));
assert!(!gossip_table.paused.contains_key(&data_id));
let action = gossip_table.new_partial_data(&data_id, node_ids[2].clone());
let expected = GossipAction::GetRemainder {
holder: node_ids[2].clone(),
};
assert_eq!(expected, action);
check_holders(&node_ids[2..3], &gossip_table, &data_id);
let action = gossip_table.remove_holder_if_unresponsive(&data_id, node_ids[2].clone());
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[..0], &gossip_table, &data_id);
assert!(!gossip_table.current.contains_key(&data_id));
assert!(!gossip_table.paused.contains_key(&data_id));
let action = gossip_table.new_complete_data(&data_id, Some(node_ids[3].clone()));
let expected = Some(ShouldGossip {
count: EXPECTED_DEFAULT_INFECTION_TARGET,
exclude_peers: iter::once(node_ids[3].clone()).collect(),
is_already_held: false,
});
assert_eq!(expected, action);
check_holders(&node_ids[3..4], &gossip_table, &data_id);
}
#[test]
fn should_not_remove_holder_if_responsive() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_partial_data(&data_id, node_ids[0].clone());
let _ = gossip_table.new_complete_data(&data_id, Some(node_ids[0].clone()));
let action = gossip_table.remove_holder_if_unresponsive(&data_id, node_ids[0].clone());
assert_eq!(GossipAction::Noop, action); check_holders(&node_ids[..1], &gossip_table, &data_id);
assert!(gossip_table.current.contains_key(&data_id));
assert!(!gossip_table.paused.contains_key(&data_id));
}
#[test]
fn should_not_auto_resume_manually_paused() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_partial_data(&data_id, node_ids[0].clone());
gossip_table.pause(&data_id);
let action = gossip_table.remove_holder_if_unresponsive(&data_id, node_ids[0].clone());
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[..0], &gossip_table, &data_id);
let action = gossip_table.new_partial_data(&data_id, node_ids[1].clone());
assert_eq!(GossipAction::Noop, action);
check_holders(&node_ids[1..2], &gossip_table, &data_id);
assert!(!gossip_table.current.contains_key(&data_id));
assert!(gossip_table.paused.contains_key(&data_id));
}
#[test]
fn should_purge() {
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let data_id: u64 = rng.gen();
let mut gossip_table = GossipTable::new(Config::default());
let _ = gossip_table.new_complete_data(&data_id, None);
for node_id in &node_ids[0..EXPECTED_DEFAULT_INFECTION_TARGET] {
let _ = gossip_table.we_infected(&data_id, node_id.clone());
}
assert!(gossip_table.finished.contains(&data_id));
Instant::advance_time(DEFAULT_FINISHED_ENTRY_DURATION_SECS * 1_000 + 1);
gossip_table.purge_finished();
assert!(!gossip_table.finished.contains(&data_id));
let _ = gossip_table.new_complete_data(&data_id, None);
gossip_table.pause(&data_id);
assert!(gossip_table.paused.contains_key(&data_id));
Instant::advance_time(DEFAULT_FINISHED_ENTRY_DURATION_SECS * 1_000 + 1);
gossip_table.purge_finished();
assert!(!gossip_table.paused.contains_key(&data_id));
}
#[bench]
fn benchmark_purging(bencher: &mut Bencher) {
const ENTRY_COUNT: usize = 10_000;
let mut rng = crate::new_rng();
let node_ids = random_node_ids(&mut rng);
let deploy_ids = iter::repeat_with(|| DeployHash::new(Digest::random(&mut rng)))
.take(ENTRY_COUNT)
.collect::<Vec<_>>();
let mut gossip_table = GossipTable::new(Config::default());
for deploy_id in &deploy_ids {
let _ = gossip_table.new_complete_data(deploy_id, None);
for node_id in &node_ids[0..EXPECTED_DEFAULT_INFECTION_TARGET] {
let _ = gossip_table.we_infected(deploy_id, node_id.clone());
}
assert!(gossip_table.finished.contains(&deploy_id));
}
bencher.iter(|| gossip_table.purge_finished());
}
}