routing 0.22.0

A secured storage DHT
//! Run a local network with 'nodes' nodes sending 'requests' requests.

#![cfg(not(feature = "use-mock-crust"))]

mod utils;

use std::io::Write;
use std::time::Duration;
use std::{io, env, thread};
use std::sync::{Arc, Mutex, Condvar};
use std::process::{Child, Command, Stdio};

use docopt::Docopt;
use sodiumoxide::crypto::hash;
use utils::{ExampleNode, ExampleClient};
use routing::{Data, DataIdentifier, PlainData, XorName, GROUP_SIZE};

use maidsafe_utilities::serialisation::serialise;
use maidsafe_utilities::thread::RaiiThreadJoiner;

use rand::{thread_rng, random, ThreadRng};
use rand::distributions::{IndependentSample, Range};

use term::color;

const CHURN_MIN_WAIT_SEC: u64 = 20;
const CHURN_MAX_WAIT_SEC: u64 = 30;
const CHURN_TIME_SEC: u64 = 20;
const DEFAULT_REQUESTS: usize = 30;
const DEFAULT_NODE_COUNT: usize = 20;
/// The number of churn-get cycles.
const DEFAULT_BATCHES: usize = 1;

struct NodeProcess(Child, usize);

impl Drop for NodeProcess {
    fn drop(&mut self) {
        match self.0.kill() {
            Ok(()) => println!("Killed Node with Process ID #{}",,
            Err(err) => {
                println!("Error killing Node with Process ID #{} - {:?}",

fn start_nodes(count: usize) -> Result<Vec<NodeProcess>, io::Error> {
    println!("--------- Starting {} nodes -----------", count);

    let current_exe_path = unwrap_result!(env::current_exe());
    let mut log_path = current_exe_path.clone();

    let nodes = try!((0..count)
        .map(|i| {
            log_path.set_file_name(&format!("Node_{:02}.log", i + 1));
            let mut args = vec![format!("--output={}", log_path.display())];
            if i == 0 {

            let node = NodeProcess(try!(Command::new(current_exe_path.clone())
                                   i + 1);

            println!("Started Node #{} with Process ID {}", i + 1,;
            if i == 0 {


fn simulate_churn(mut nodes: Vec<NodeProcess>,
                  network_size: usize,
                  stop_flg: Arc<(Mutex<bool>, Condvar)>)
                  -> RaiiThreadJoiner {
    let joiner = thread!("ChurnSimulationThread", move || {
        let mut rng = thread_rng();
        let wait_range = Range::new(CHURN_MIN_WAIT_SEC, CHURN_MAX_WAIT_SEC);

        let mut node_count = nodes.len();

        loop {
                let &(ref lock, ref cvar) = &*stop_flg;

                let mut stop_condition = unwrap_result!(lock.lock());
                let mut wait_timed_out = false;
                let wait_for = wait_range.ind_sample(&mut rng);

                while !*stop_condition && !wait_timed_out {
                    let wake_up_result = unwrap_result!(cvar.wait_timeout(stop_condition,
                    stop_condition = wake_up_result.0;
                    wait_timed_out = wake_up_result.1.timed_out();

                if *stop_condition {

            if let Err(err) = simulate_churn_impl(&mut nodes,
                                                  &mut rng,
                                                  &mut node_count) {
                println!("{:?}", err);


fn simulate_churn_impl(nodes: &mut Vec<NodeProcess>,
                       rng: &mut ThreadRng,
                       network_size: usize,
                       node_count: &mut usize)
                       -> Result<(), io::Error> {
    print!("Churning on {} active nodes. ", nodes.len());
    io::stdout().flush().expect("Could not flush stdout");

    let kill_node = match nodes.len() {
        size if size == GROUP_SIZE => false,
        size if size == network_size => true,
        _ => random(),

    let current_exe_path = unwrap_result!(env::current_exe());
    let mut log_path = current_exe_path.clone();

    if kill_node {
        // Never kill the bootstrap (0th) node
        let kill_at_index = Range::new(1, nodes.len()).ind_sample(rng);
        let node = nodes.remove(kill_at_index);
        print!("Killing Node #{}: ", node.1);
        io::stdout().flush().expect("Could not flush stdout");
    } else {
        *node_count += 1;
        log_path.set_file_name(&format!("Node_{:02}.log", node_count));
        let arg = format!("--output={}", log_path.display());

        println!("Started Node #{} with Process ID #{}",
                 nodes[nodes.len() - 1];


fn print_color(text: &str, color: color::Color) {
    let mut term = term::stdout().expect("Could not open stdout.");
    term.fg(color).expect("Failed to set color");
    print!("{}", text);
    term.reset().expect("Failed to restore stdout attributes.");
    io::stdout().flush().expect("Could not flush stdout");

fn store_and_verify(requests: usize, batches: usize) {
    println!("--------- Starting Client -----------");
    let mut example_client = ExampleClient::new();

    println!("--------- Putting Data -----------");
    let mut stored_data = Vec::with_capacity(requests);
    for i in 0..requests {
        let key: String = (0..10).map(|_| random::<u8>() as char).collect();
        let value: String = (0..10).map(|_| random::<u8>() as char).collect();
        let name = XorName(hash::sha256::hash(key.as_bytes()).0);
        let data = unwrap_result!(serialise(&(key, value)));
        let data = Data::Plain(PlainData::new(name, data));

        print!("Putting Data: count #{} - Data {:?} - ", i + 1, name);
        io::stdout().flush().expect("Could not flush stdout");
        if example_client.put(data.clone()).is_ok() {
            print_color("OK", color::GREEN);
            print!(" - getting - ");
            io::stdout().flush().expect("Could not flush stdout");
            if let Some(got_data) = example_client.get(DataIdentifier::Plain( {
                assert_eq!(got_data, data);
                print_color("OK\n", color::GREEN);
            } else {
                print_color("FAIL\n", color::RED);
        } else {
            print_color("FAIL\n", color::RED);

    for batch in 0..batches {
        println!("--------- Churning {} seconds -----------", CHURN_TIME_SEC);

        println!("--------- Getting Data - batch {} of {} -----------",
                 batch + 1,
        for (i, data_item) in stored_data.iter().enumerate().take(requests) {
            print!("Get attempt #{} - Data {:?} - ", i + 1,;
            io::stdout().flush().expect("Could not flush stdout");
            if let Some(data) = example_client.get(DataIdentifier::Plain( {
                assert_eq!(data, stored_data[i]);
                print_color("OK\n", color::GREEN);
            } else {
                print_color("FAIL\n", color::RED);

// ==========================   Program Options   =================================
#[cfg_attr(rustfmt, rustfmt_skip)]
static USAGE: &'static str = "
  ci_test -h
  ci_test --output=<log_file> [-c [<requests> [<batches>]]] [-f] [-d]
  ci_test [<nodes> <requests> [<batches>]]

  -o, --output=<log_file>       Run individual CI node.
  -c, --client                  Run as an individual client.
  -d, --delete-bootstrap-cache  Delete existing bootstrap-cache.
  -h, --help                    Display this help message.
  -f, --first                   This is the first node of a new network.
// ================================================================================

#[derive(PartialEq, Eq, Debug, Clone, RustcDecodable)]
struct Args {
    arg_batches: Option<usize>,
    arg_nodes: Option<usize>,
    arg_requests: Option<usize>,
    flag_first: Option<bool>,
    flag_output: Option<String>,
    flag_client: Option<bool>,
    flag_delete_bootstrap_cache: Option<bool>,
    flag_help: Option<bool>,

#[cfg_attr(feature="clippy", allow(mutex_atomic))] // AtomicBool cannot be used with Condvar.
fn main() {
    let args: Args = Docopt::new(USAGE)
        .and_then(|docopt| docopt.decode())
        .unwrap_or_else(|error| error.exit());

    let run_network_test = !(args.flag_output.is_some() ||
    let requests = args.arg_requests.unwrap_or(DEFAULT_REQUESTS);
    let batches = args.arg_batches.unwrap_or(DEFAULT_BATCHES);
    let first = args.flag_first.unwrap_or(false);

    if run_network_test {
        let node_count = match args.arg_nodes {
            Some(number) => {
                if number <= GROUP_SIZE {
                    panic!("The number of nodes should be > Group-Size. {{Group-Size = #{}}}",

            None => DEFAULT_NODE_COUNT,

        let nodes = unwrap_result!(start_nodes(node_count));

        let stop_flg = Arc::new((Mutex::new(false), Condvar::new()));
        let _raii_joiner = simulate_churn(nodes, node_count, stop_flg.clone());

        store_and_verify(requests, batches);

        // Graceful exit
            let &(ref lock, ref cvar) = &*stop_flg;
            *unwrap_result!(lock.lock()) = true;
    } else if let Some(log_file) = args.flag_output {
        unwrap_result!(maidsafe_utilities::log::init_to_file(false, log_file, true));

        if let Some(true) = args.flag_delete_bootstrap_cache {
            // TODO Remove bootstrap cache file

        if Some(true) == args.flag_client {
            store_and_verify(requests, batches);
        } else {