#![cfg(any(test, feature = "use-mock-crust"))]
#![allow(dead_code, missing_docs)]
use super::{Error, RoutingTable};
use super::authority::Authority;
use super::prefix::Prefix;
use maidsafe_utilities::SeededRng;
use rand::Rng;
use routing_table::{OwnMergeDetails, OwnMergeState};
use routing_table::xorable::Xorable;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Binary, Debug};
use std::hash::Hash;
use std::iter::IntoIterator;
#[derive(Default)]
struct Network {
min_section_size: usize,
rng: SeededRng,
nodes: BTreeMap<u64, RoutingTable<u64>>,
}
impl Network {
fn new(min_section_size: usize, optional_seed: Option<[u32; 4]>) -> Network {
Network {
min_section_size: min_section_size,
rng: optional_seed.map_or_else(SeededRng::new, SeededRng::from_seed),
nodes: BTreeMap::new(),
}
}
pub fn min_section_size(&self) -> usize {
self.min_section_size
}
fn add_node(&mut self) {
let name = self.random_free_name(); if self.nodes.is_empty() {
let result = self.nodes.insert(name, RoutingTable::new(name, self.min_section_size));
assert!(result.is_none());
return;
}
let mut new_table = RoutingTable::new(name, self.min_section_size);
{
let close_node = self.close_node(name);
let close_peer = &self.nodes[&close_node];
unwrap!(new_table.add_prefixes(close_peer.prefixes().into_iter().collect()));
}
let mut split_prefixes = BTreeSet::new();
for node in self.nodes.values_mut() {
match node.add(name, false) {
Ok(true) => {
let _ = split_prefixes.insert(*node.our_prefix());
}
Ok(false) => {}
Err(e) => trace!("failed to add node with error {:?}", e),
}
match new_table.add(*node.our_name(), false) {
Ok(true) => {
let prefix = *new_table.our_prefix();
let _ = split_prefixes.insert(prefix);
let _ = new_table.split(prefix);
}
Ok(false) => {}
Err(e) => trace!("failed to add node into new with error {:?}", e),
}
}
assert!(self.nodes.insert(name, new_table).is_none());
for split_prefix in &split_prefixes {
for node in self.nodes.values_mut() {
let _ = node.split(*split_prefix);
}
}
}
fn store_merge_info<T: PartialEq + Debug>(merge_info: &mut BTreeMap<Prefix<u64>, T>,
prefix: Prefix<u64>,
new_info: T) {
if let Some(content) = merge_info.get(&prefix) {
assert_eq!(new_info, *content);
return;
}
let _ = merge_info.insert(prefix, new_info);
}
#[cfg_attr(feature="cargo-clippy", allow(for_kv_map))]
fn drop_node(&mut self) {
let keys = self.keys();
let name = *unwrap!(self.rng.choose(&keys));
let _ = self.nodes.remove(&name);
let mut merge_own_info: BTreeMap<Prefix<u64>, OwnMergeDetails<u64>> = BTreeMap::new();
for node in self.nodes.values_mut() {
if node.iter().any(|&name_in_table| name_in_table == name) {
let removed_node_is_in_our_section = node.is_in_our_section(&name);
let removal_details = unwrap!(node.remove(&name));
assert_eq!(name, removal_details.name);
assert_eq!(removed_node_is_in_our_section,
removal_details.was_in_our_section);
if let Some(info) = node.should_merge(false, false) {
Network::store_merge_info(&mut merge_own_info, *node.our_prefix(), info);
}
} else {
match node.remove(&name) {
Err(Error::NoSuchPeer) => {}
Err(error) => panic!("Expected NoSuchPeer, but got {:?}", error),
Ok(details) => panic!("Expected NoSuchPeer, but got {:?}", details),
}
}
}
let mut expected_peers = BTreeMap::new();
while !merge_own_info.is_empty() {
let mut merge_other_info = BTreeMap::new();
let own_info = merge_own_info;
merge_own_info = BTreeMap::new();
for (_, merge_own_details) in own_info {
let nodes = self.nodes_covered_by_prefixes(&[merge_own_details.merge_prefix]);
for node in &nodes {
let target_node = unwrap!(self.nodes.get_mut(&node));
let node_expected = expected_peers.entry(*node).or_insert_with(BTreeSet::new);
for section in &merge_own_details.sections {
node_expected.extend(section.1.iter().filter(|name| {
!target_node.has(name)
}));
}
match target_node.merge_own_section(merge_own_details.clone()) {
OwnMergeState::Ongoing |
OwnMergeState::AlreadyMerged => (),
OwnMergeState::Completed { targets, merge_details } => {
Network::store_merge_info(&mut merge_other_info,
*target_node.our_prefix(),
(targets, merge_details));
for name in node_expected.clone() {
if let Err(e) = target_node.add(name, false) {
panic!("Error adding node: {:?}", e);
}
node_expected.remove(&name);
}
if node_expected.is_empty() {
if let Some(info) = target_node.should_merge(false, false) {
Network::store_merge_info(&mut merge_own_info,
*target_node.our_prefix(),
info);
}
}
}
}
}
}
for (_, (target_prefixes, merge_other_details)) in merge_other_info {
let targets = self.nodes_covered_by_prefixes(&target_prefixes);
for target in targets {
let target_node = unwrap!(self.nodes.get_mut(&target));
let contacts = target_node.merge_other_section(merge_other_details.clone());
for contact in contacts {
let _ = target_node.add(contact, false);
}
if let Some(info) = target_node.should_merge(false, false) {
Network::store_merge_info(&mut merge_own_info,
*target_node.our_prefix(),
info);
}
}
}
}
}
fn nodes_covered_by_prefixes<'a, T>(&self, prefixes: T) -> Vec<u64>
where T: IntoIterator<Item = &'a Prefix<u64>> + Copy
{
self.nodes
.keys()
.filter(|&name| prefixes.into_iter().any(|prefix| prefix.matches(name)))
.cloned()
.collect()
}
fn random_free_name(&mut self) -> u64 {
loop {
let name = self.rng.gen();
if !self.nodes.contains_key(&name) {
return name;
}
}
}
fn send_message(&self, src: u64, dst: Authority<u64>, route: usize) {
let mut received = Vec::new(); let mut handled = BTreeSet::new(); received.push(src);
while let Some(node) = received.pop() {
handled.insert(node); for target in unwrap!(self.nodes[&node].targets(&dst, src, route)) {
if !handled.contains(&target) && !received.contains(&target) {
received.push(target);
}
}
}
if dst.is_single() {
assert!(handled.contains(&dst.name()),
"Message to {:?} only handled by {:?}",
dst,
handled);
} else {
let close_node = self.close_node(dst.name());
for node in unwrap!(self.nodes[&close_node].close_names(&dst.name())) {
assert!(handled.contains(&node));
}
}
}
fn close_node(&self, address: u64) -> u64 {
let target = Authority::Section(address);
unwrap!(self.nodes
.iter()
.find(|&(_, table)| table.in_authority(&target))
.map(|(&peer, _)| peer))
}
fn keys(&self) -> Vec<u64> {
self.nodes
.keys()
.cloned()
.collect()
}
}
#[test]
fn node_to_node_message() {
let mut network = Network::new(8, None);
for _ in 0..100 {
network.add_node();
}
let keys = network.keys();
for _ in 0..20 {
let src = *unwrap!(network.rng.choose(&keys));
let dst = *unwrap!(network.rng.choose(&keys));
for route in 0..network.min_section_size {
network.send_message(src, Authority::ManagedNode(dst), route);
}
}
}
#[test]
fn node_to_section_message() {
let mut network = Network::new(8, None);
for _ in 0..100 {
network.add_node();
}
let keys = network.keys();
for _ in 0..20 {
let src = *unwrap!(network.rng.choose(&keys));
let dst = network.rng.gen();
for route in 0..network.min_section_size {
network.send_message(src, Authority::Section(dst), route);
}
}
}
fn verify_invariant(network: &Network) {
verify_network_invariant(network.nodes.values());
}
pub fn verify_network_invariant<'a, T, U>(nodes: U)
where T: Binary + Clone + Copy + Debug + Default + Hash + Xorable + 'a,
U: IntoIterator<Item = &'a RoutingTable<T>>
{
let mut sections: BTreeMap<Prefix<T>, (T, BTreeSet<T>)> = BTreeMap::new();
for node in nodes {
node.verify_invariant();
for prefix in node.prefixes() {
let section_content = if prefix == node.our_prefix {
node.our_section.clone()
} else {
node.sections[&prefix].clone()
};
if let Some(&mut (ref mut src, ref mut section)) = sections.get_mut(&prefix) {
assert_eq!(*section,
section_content,
"Section with prefix {:?} doesn't agree between nodes {:?} and {:?}\n\
{:?}: {:?}, {:?}: {:?}",
prefix,
node.our_name,
src,
node.our_name,
section_content,
src,
section);
continue;
}
let _ = sections.insert(prefix, (node.our_name, section_content));
}
}
for prefix1 in sections.keys() {
for prefix2 in sections.keys() {
if prefix1 == prefix2 {
continue;
}
if prefix1.is_compatible(prefix2) {
panic!("Section prefixes should be disjoint, but these are not:\n\
Section {:?}, according to node {:?}: {:?}\n\
Section {:?}, according to node {:?}: {:?}",
prefix1,
sections[prefix1].0,
sections[prefix1].1,
prefix2,
sections[prefix2].0,
sections[prefix2].1);
}
}
}
for (prefix, data) in §ions {
for name in &data.1 {
if !prefix.matches(name) {
panic!("Section members should match the prefix, but {:?} \
does not match {:?}",
name,
prefix);
}
}
}
assert!(Prefix::default().is_covered_by(sections.keys()));
}
#[test]
fn sections_have_identical_routing_tables() {
let mut network = Network::new(8, None);
for _ in 0..100 {
network.add_node();
verify_invariant(&network);
}
}
#[test]
fn merging_sections() {
let mut network = Network::new(8, None);
for _ in 0..100 {
network.add_node();
verify_invariant(&network);
}
assert!(network.nodes.iter().all(|(_, table)| if table.num_of_sections() < 2 {
trace!("{:?}", table);
false
} else {
true
}));
for _ in 0..95 {
network.drop_node();
verify_invariant(&network);
}
assert!(network.nodes.iter().all(|(_, table)| if table.num_of_sections() > 0 {
trace!("{:?}", table);
false
} else {
true
}));
}