#![forbid(bad_style, exceeding_bitshifts, mutable_transmutes, no_mangle_const_items,
unknown_crate_types, warnings)]
#![deny(deprecated, drop_with_repr_extern, improper_ctypes, missing_docs,
non_shorthand_field_patterns, overflowing_literals, plugin_as_library,
private_no_mangle_fns, private_no_mangle_statics, stable_features, unconditional_recursion,
unknown_lints, unsafe_code, unused, unused_allocation, unused_attributes,
unused_comparisons, unused_features, unused_parens, while_true)]
#![warn(trivial_casts, trivial_numeric_casts, unused_extern_crates, unused_import_braces,
unused_qualifications, unused_results, variant_size_differences)]
#![allow(box_pointers, fat_ptr_transmutes, missing_copy_implementations,
missing_debug_implementations)]
#![cfg(not(feature = "use-mock-crust"))]
#![cfg_attr(feature="clippy", feature(plugin))]
#![cfg_attr(feature="clippy", plugin(clippy))]
#![cfg_attr(feature="clippy", deny(clippy, clippy_pedantic))]
#![cfg_attr(feature="clippy", allow(use_debug))]
extern crate itertools;
extern crate kademlia_routing_table;
#[cfg(target_os = "macos")]
extern crate libc;
#[macro_use]
extern crate maidsafe_utilities;
extern crate rand;
extern crate routing;
extern crate sodiumoxide;
extern crate xor_name;
mod utils;
use std::cmp::Ordering::{Greater, Less};
use std::collections::HashSet;
#[cfg(target_os = "macos")]
use std::io;
use std::{iter, thread};
use std::sync::mpsc::{self, Receiver, Sender};
use std::time::Duration;
use itertools::Itertools;
use kademlia_routing_table::GROUP_SIZE;
use maidsafe_utilities::serialisation;
use maidsafe_utilities::thread::RaiiThreadJoiner;
use routing::{Authority, Client, Data, Event, FullId, MessageId, Node, PlainData, RequestContent,
RequestMessage, ResponseContent, ResponseMessage};
use sodiumoxide::crypto;
use sodiumoxide::crypto::hash::sha512;
use utils::recv_with_timeout;
use xor_name::XorName;
use routing::DataIdentifier;
const QUORUM_SIZE: usize = 5;
#[derive(Debug)]
struct TestEvent(usize, Event);
struct TestNode {
node: Node,
_thread_joiner: RaiiThreadJoiner,
}
impl TestNode {
fn new(index: usize, main_sender: Sender<TestEvent>) -> Self {
let thread_name = format!("TestNode {} event sender", index);
let (sender, joiner) = spawn_select_thread(index, main_sender, thread_name);
TestNode {
node: unwrap_result!(Node::new(sender, false)),
_thread_joiner: joiner,
}
}
fn name(&self) -> XorName {
unwrap_result!(self.node.name())
}
}
struct TestClient {
index: usize,
full_id: FullId,
client: Client,
_thread_joiner: RaiiThreadJoiner,
}
impl TestClient {
fn new(index: usize, main_sender: Sender<TestEvent>) -> Self {
let thread_name = format!("TestClient {} event sender", index);
let (sender, joiner) = spawn_select_thread(index, main_sender, thread_name);
let sign_keys = crypto::sign::gen_keypair();
let encrypt_keys = crypto::box_::gen_keypair();
let full_id = FullId::with_keys(encrypt_keys, sign_keys);
TestClient {
index: index,
full_id: full_id.clone(),
client: unwrap_result!(Client::new(sender, Some(full_id), false)),
_thread_joiner: joiner,
}
}
pub fn name(&self) -> &XorName {
self.full_id.public_id().name()
}
}
#[cfg(target_os = "macos")]
#[allow(unsafe_code)]
fn get_open_file_limits() -> io::Result<libc::rlimit> {
unsafe {
let mut result = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
if libc::getrlimit(libc::RLIMIT_NOFILE, &mut result) != 0 {
return Err(io::Error::last_os_error());
}
Ok(result)
}
}
#[cfg(target_os = "macos")]
#[allow(unsafe_code)]
fn set_open_file_limits(limits: libc::rlimit) -> io::Result<()> {
unsafe {
if libc::setrlimit(libc::RLIMIT_NOFILE, &limits) != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
}
#[cfg(target_os = "macos")]
fn init() {
unwrap_result!(maidsafe_utilities::log::init(true));
let mut limits = unwrap_result!(get_open_file_limits());
if limits.rlim_cur < 1024 {
limits.rlim_cur = 1024;
unwrap_result!(set_open_file_limits(limits));
}
}
#[cfg(not(target_os = "macos"))]
fn init() {
unwrap_result!(maidsafe_utilities::log::init(true));
}
fn spawn_select_thread(index: usize,
main_sender: Sender<TestEvent>,
thread_name: String)
-> (Sender<Event>, RaiiThreadJoiner) {
let (sender, receiver) = mpsc::channel();
let thread_handle = thread!(thread_name, move || {
for event in receiver.iter() {
let _ = unwrap_result!(main_sender.send(TestEvent(index, event)));
}
});
(sender, RaiiThreadJoiner::new(thread_handle))
}
fn wait_for_nodes_to_connect(nodes: &[TestNode],
connection_counts: &mut [usize],
event_receiver: &Receiver<TestEvent>) {
loop {
if let Ok(test_event) = recv_with_timeout(&event_receiver, Duration::from_secs(30)) {
if let TestEvent(index, Event::NodeAdded(..)) = test_event {
connection_counts[index] += 1;
let k = nodes.len();
let all_events_received = (0..k)
.map(|i| connection_counts[i])
.all(|n| n >= k - 1 || n >= GROUP_SIZE - 1);
if all_events_received {
break;
}
}
} else {
panic!("Timeout");
}
}
}
fn create_connected_nodes(count: usize,
event_sender: Sender<TestEvent>,
event_receiver: &Receiver<TestEvent>)
-> Vec<TestNode> {
let mut nodes = Vec::with_capacity(count);
let mut connection_counts = iter::repeat(0).take(count).collect::<Vec<usize>>();
nodes.push(TestNode::new(0, event_sender.clone()));
thread::sleep(Duration::from_secs(2));
for _ in 1..count {
let index = nodes.len();
nodes.push(TestNode::new(index, event_sender.clone()));
wait_for_nodes_to_connect(&nodes, &mut connection_counts, event_receiver);
}
nodes
}
fn gen_plain_data() -> Data {
let key: String = (0..10).map(|_| rand::random::<u8>() as char).collect();
let value: String = (0..10).map(|_| rand::random::<u8>() as char).collect();
let name = XorName::new(sha512::hash(key.as_bytes()).0);
let data = unwrap_result!(serialisation::serialise(&(key, value)));
Data::Plain(PlainData::new(name, data))
}
fn closest_nodes(node_names: &[XorName], target: &XorName) -> Vec<XorName> {
node_names.iter()
.sorted_by(|a, b| {
if xor_name::closer_to_target(a, b, target) {
Less
} else {
Greater
}
})
.into_iter()
.take(GROUP_SIZE)
.cloned()
.collect()
}
#[cfg_attr(feature="clippy", allow(cyclomatic_complexity))]
fn core() {
let (event_sender, event_receiver) = mpsc::channel();
let mut nodes = create_connected_nodes(GROUP_SIZE + 1, event_sender.clone(), &event_receiver);
{
let client = TestClient::new(nodes.len(), event_sender.clone());
let data = gen_plain_data();
let message_id = MessageId::new();
loop {
if let Ok(test_event) = recv_with_timeout(&event_receiver, Duration::from_secs(20)) {
match test_event {
TestEvent(index, Event::Connected) if index == client.index => {
let src = Authority::ClientManager(*client.name());
let result = client.client.send_put_request(src, data.clone(), message_id);
assert!(result.is_ok());
}
TestEvent(index, Event::Request(message)) => {
if let RequestContent::Put(_, ref id) = message.content {
let node = &nodes[index].node;
unwrap_result!(node.send_put_success(message.dst,
message.src,
DataIdentifier::Plain(data.name()),
id.clone()));
}
}
TestEvent(index,
Event::Response(ResponseMessage{
content: ResponseContent::PutSuccess(ref identifier, ref id), .. }))
if index == client.index => {
assert_eq!(&message_id, id);
assert_eq!(identifier.name(), data.name());
break;
}
_ => (),
}
} else {
panic!("Timeout");
}
}
}
{
let node_names = nodes.iter().map(|node| node.name()).collect_vec();
let client = TestClient::new(nodes.len(), event_sender.clone());
let data = gen_plain_data();
let mut close_group = closest_nodes(&node_names, client.name());
loop {
if let Ok(test_event) = recv_with_timeout(&event_receiver, Duration::from_secs(20)) {
match test_event {
TestEvent(index, Event::Connected) if index == client.index => {
assert!(client.client
.send_put_request(Authority::ClientManager(*client.name()),
data.clone(),
MessageId::new())
.is_ok());
}
TestEvent(index,
Event::Request(RequestMessage{
content: RequestContent::Put(..), ..
})) => {
close_group.retain(|&name| name != nodes[index].name());
if close_group.is_empty() {
break;
}
}
_ => (),
}
} else {
panic!("Timeout");
}
}
assert!(close_group.is_empty());
}
{
let node_names = nodes.iter().map(|node| node.name()).collect_vec();
let client = TestClient::new(nodes.len(), event_sender.clone());
let data = gen_plain_data();
let mut close_group = closest_nodes(&node_names, client.name());
loop {
if let Ok(test_event) = recv_with_timeout(&event_receiver, Duration::from_secs(20)) {
match test_event {
TestEvent(index, Event::Connected) if index == client.index => {
assert!(client.client
.send_put_request(Authority::ClientManager(*client.name()),
data.clone(),
MessageId::new())
.is_ok());
}
TestEvent(index,
Event::Request(RequestMessage{
src: Authority::Client{ .. },
dst: Authority::ClientManager(name),
content: RequestContent::Put(data, id),
})) => {
let src = Authority::ClientManager(name);
let dst = Authority::NaeManager(data.name().clone());
unwrap_result!(nodes[index]
.node
.send_put_request(src, dst, data.clone(), id.clone()));
}
TestEvent(index, Event::Request(ref msg)) => {
if let RequestContent::Put(_, ref id) = msg.content {
unwrap_result!(nodes[index].node.send_put_failure(msg.dst.clone(),
msg.src.clone(),
msg.clone(),
vec![],
id.clone()));
}
}
TestEvent(index,
Event::Response(ResponseMessage{
content: ResponseContent::PutFailure{ .. },
..
})) => {
close_group.retain(|&name| name != nodes[index].name());
if close_group.is_empty() {
break;
}
}
_ => (),
}
} else {
panic!("Timeout");
}
}
assert!(close_group.is_empty());
}
{
let mut churns = iter::repeat(false).take(nodes.len() - 1).collect::<Vec<_>>();
let node = unwrap_option!(nodes.pop(), "No more nodes left.");
let name = node.name();
drop(node);
loop {
if let Ok(test_event) = recv_with_timeout(&event_receiver, Duration::from_secs(20)) {
match test_event {
TestEvent(index, Event::NodeLost(lost_name, _)) if index < nodes.len() &&
lost_name == name => {
churns[index] = true;
if churns.iter().all(|b| *b) {
break;
}
}
_ => (),
}
} else {
panic!("Timeout");
}
}
}
{
let nodes_len = nodes.len();
let mut churns = iter::repeat(false).take(nodes_len + 1).collect::<Vec<_>>();
nodes.push(TestNode::new(nodes_len, event_sender.clone()));
loop {
if let Ok(test_event) = recv_with_timeout(&event_receiver, Duration::from_secs(20)) {
match test_event {
TestEvent(index, Event::NodeAdded(..)) if index < nodes.len() => {
churns[index] = true;
if churns.iter().all(|b| *b) {
break;
}
}
_ => (),
}
} else {
panic!("Timeout");
}
}
}
{
let client = TestClient::new(nodes.len(), event_sender.clone());
let data = gen_plain_data();
while let Ok(test_event) = recv_with_timeout(&event_receiver, Duration::from_secs(5)) {
match test_event {
TestEvent(index, Event::Connected) if index == client.index => {
assert!(client.client
.send_put_request(Authority::ClientManager(*client.name()),
data.clone(),
MessageId::new())
.is_ok());
}
TestEvent(index,
Event::Request(RequestMessage{ src: Authority::Client{ .. },
dst: Authority::ClientManager(name),
content: RequestContent::Put(data, id)
})) => {
let src = Authority::ClientManager(name);
let dst = Authority::NaeManager(data.name().clone());
unwrap_result!(nodes[index]
.node
.send_put_request(src, dst, data.clone(), id.clone()));
}
TestEvent(index, Event::Request(ref msg)) => {
if let RequestContent::Put(_, ref id) = msg.content {
if index < QUORUM_SIZE - 1 {
unwrap_result!(nodes[index].node.send_put_failure(msg.dst.clone(),
msg.src.clone(),
msg.clone(),
vec![],
id.clone()));
}
}
}
TestEvent(_index,
Event::Response(ResponseMessage{
content: ResponseContent::PutFailure{ .. },
..
})) => {
panic!("Unexpected response.");
}
_ => (),
}
}
}
{
let client = TestClient::new(nodes.len(), event_sender.clone());
let data = gen_plain_data();
let mut sent_ids = HashSet::new();
let mut received_ids = HashSet::new();
loop {
if let Ok(test_event) = recv_with_timeout(&event_receiver, Duration::from_secs(5)) {
match test_event {
TestEvent(index, Event::Connected) if index == client.index => {
let src = Authority::ClientManager(*client.name());
let message_id = MessageId::new();
let result = client.client.send_put_request(src, data.clone(), message_id);
assert!(result.is_ok());
sent_ids.insert(message_id);
}
TestEvent(index, Event::Request(message)) => {
if let RequestContent::Put(_, id) = message.content {
unwrap_result!(nodes[index]
.node
.send_put_success(message.dst,
message.src,
DataIdentifier::Plain(data.name()),
id));
}
}
TestEvent(index,
Event::Response(ResponseMessage{
content: ResponseContent::PutSuccess(ref name, ref id), .. }))
if index == client.index => {
assert!(received_ids.insert(*id));
assert_eq!(name, &DataIdentifier::Plain(data.name()));
}
_ => (),
}
} else {
assert_eq!(1, received_ids.len());
assert_eq!(sent_ids, received_ids);
break;
}
}
}
}
#[test]
fn main() {
init();
core();
}