delix 0.2.4

Decentral HTTP proxy / load balancer
// Copyright 2015 The Delix Project Authors. See the AUTHORS file at the top level directory.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

extern crate delix;

mod helper;

use std::io;
use std::iter;
use std::sync::{Arc, Mutex, RwLock, mpsc};
use std::thread;

use delix::node::request;

#[test]
fn single_echo_from_local_without_timeout() {
    helper::set_up();

    let (node, metric) = helper::build_node("localhost:3001", &[], None);
    node.register("echo", Box::new(|request| Ok(request)))
            .unwrap();

    helper::wait_for_services(&[&metric], 1);

    assert_eq!("test message", String::from_utf8_lossy(&node.request_bytes("echo", b"test message").unwrap()));
}

#[test]
fn single_large_echo_from_local_without_timeout() {
    helper::set_up();

    let (node, metric) = helper::build_node("localhost:3011", &[], None);
    node.register("echo", Box::new(|request| Ok(request)))
            .unwrap();

    helper::wait_for_services(&[&metric], 1);

    let request_bytes = iter::repeat(0u8).take(70000).collect::<Vec<_>>();
    let request = Box::new(io::Cursor::new(request_bytes.clone()));
    node.request("echo", request, Box::new(move |mut reader| {
        assert_eq!(Some(70000), io::copy(&mut reader, &mut io::sink()).ok());
    })).unwrap();
}

#[test]
fn single_echo_from_local_with_timeout() {
    helper::set_up();

    let (node, metric) = helper::build_node("localhost:3021", &[], Some(10));
    node.register("echo", Box::new(|request| {
        thread::sleep(::std::time::Duration::from_millis(100));
        Ok(request)
    })).unwrap();

    helper::wait_for_services(&[&metric], 1);

    assert_eq!(Err(request::Error::Timeout), node.request_bytes("echo", b""));
}

#[test]
fn single_echo_from_remote_without_timeout() {
    helper::set_up();

    let (node_one, metric_one) = helper::build_node("localhost:3031", &[], None);
    node_one.register("echo", Box::new(|request| Ok(request)))
            .unwrap();

    let (node_two, metric_two) = helper::build_node("localhost:3032", &["localhost:3031"], None);

    helper::wait_for_joined(&[&metric_one, &metric_two]);
    helper::wait_for_services(&[&metric_one, &metric_two], 1);

    assert_eq!("test message", String::from_utf8_lossy(&node_two.request_bytes("echo", b"test message").unwrap()));
}

#[test]
fn single_echo_from_remote_with_timeout() {
    helper::set_up();

    let (node_one, metric_one) = helper::build_node("localhost:3041", &[], None);
    node_one.register("echo", Box::new(|request| {
        thread::sleep(::std::time::Duration::from_millis(20));
        Ok(request)
    })).unwrap();

    let (node_two, metric_two) = helper::build_node("localhost:3042", &["localhost:3041"], Some(10));

    helper::wait_for_joined(&[&metric_one, &metric_two]);
    helper::wait_for_services(&[&metric_one, &metric_two], 1);

    assert_eq!(Err(request::Error::Timeout), node_two.request_bytes("echo", b""));
}

#[test]
fn multiple_echos_from_remote() {
    helper::set_up();

    let (node_one, metric_one) = helper::build_node("localhost:3051", &[], None);
    node_one.register("echo", Box::new(|request| Ok(request)))
            .unwrap();

    let (node_two, metric_two) = helper::build_node("localhost:3052", &["localhost:3051"], None);

    helper::wait_for_joined(&[&metric_one, &metric_two]);
    helper::wait_for_services(&[&metric_one, &metric_two], 1);

    assert_eq!(b"test message one".to_vec(), node_two.request_bytes("echo", b"test message one").unwrap());
    assert_eq!(b"test message two".to_vec(), node_two.request_bytes("echo", b"test message two").unwrap());
}

#[test]
fn balanced_echos_from_two_remotes() {
    helper::set_up();

    let (node_one, metric_one) = helper::build_node("localhost:3061", &[], None);

    let (tx, rx) = mpsc::channel();

    let (node_two, metric_two) = helper::build_node("localhost:3062", &["localhost:3061"], None);
    let tx_clone = Mutex::new(tx.clone());
    node_two.register("echo", Box::new(move |request| {
        tx_clone.lock().unwrap().send("two").unwrap();
        Ok(request)
    })).unwrap();

    let (node_three, metric_three) = helper::build_node("localhost:3063", &["localhost:3061"], None);
    let tx_clone = Mutex::new(tx.clone());
    node_three.register("echo", Box::new(move |request| {
        tx_clone.lock().unwrap().send("three").unwrap();
        Ok(request)
    })).unwrap();

    helper::wait_for_joined(&[&metric_one, &metric_two, &metric_three]);
    helper::wait_for_services(&[&metric_one, &metric_two, &metric_three], 1);

    assert_eq!("test", String::from_utf8_lossy(&node_one.request_bytes("echo", b"test").unwrap()));
    assert_eq!("test", String::from_utf8_lossy(&node_one.request_bytes("echo", b"test").unwrap()));

    helper::assert_contains_all(&["two", "three"], &helper::recv_all(&rx));

    node_three.deregister("echo").unwrap();

    assert_eq!("test", String::from_utf8_lossy(&node_one.request_bytes("echo", b"test").unwrap()));
    assert_eq!("test", String::from_utf8_lossy(&node_one.request_bytes("echo", b"test").unwrap()));

    helper::assert_contains_all(&["two", "two"], &helper::recv_all(&rx));
}

#[test]
fn parallel_requests_while_a_node_joins_and_leaves() {
    helper::set_up();

    let (node_one, metric_one) = helper::build_node("localhost:3071", &[], None);
    node_one.register("echo", Box::new(move |request| {
        Ok(request)
    })).unwrap();

    let running = Arc::new(RwLock::new(true));

    let node_one_clone = node_one.clone();
    let running_clone = running.clone();
    let jh_one = thread::spawn(move || {
        while *running_clone.read().unwrap() {
            match node_one_clone.request_bytes("echo", b"test") {
                Ok(response) => assert_eq!("test", String::from_utf8_lossy(&response)),
                Err(error) => println!("got error: {:?}", error),
            }
            thread::sleep(::std::time::Duration::from_millis(10));
        }
    });

    let node_one_clone = node_one.clone();
    let running_clone = running.clone();
    let jh_two = thread::spawn(move || {
        while *running_clone.read().unwrap() {
            match node_one_clone.request_bytes("echo", b"test") {
                Ok(response) => assert_eq!("test", String::from_utf8_lossy(&response)),
                Err(error) => println!("got error: {:?}", error),
            }
            thread::sleep(::std::time::Duration::from_millis(10));
        }
    });

    helper::wait_for_requests(&[&metric_one], 10);

    {
        let (node_two, metric_two) = helper::build_node("localhost:3072", &["localhost:3071"], None);
        node_two.register("echo", Box::new(move |request| {
            Ok(request)
        })).unwrap();

        helper::wait_for_joined(&[&metric_one, &metric_two]);
        helper::wait_for_services(&[&metric_one, &metric_two], 1);
        helper::wait_for_endpoints(&[&metric_one, &metric_two], 2);

        helper::wait_for_requests(&[&metric_one], 100);
    }

    helper::wait_for_requests(&[&metric_one], 200);

    *running.write().unwrap() = false;
    jh_one.join().unwrap();
    jh_two.join().unwrap();
}