ilp-node 0.6.0

Interledger node (sender, connector, receiver bundle)
use futures::{future::join_all, Future};
use ilp_node::InterledgerNode;
use reqwest::r#async::Client;
use serde_json::{self, json};
use tokio::runtime::Builder as RuntimeBuilder;

mod redis_helpers;
use redis_helpers::*;

mod test_helpers;
use test_helpers::*;

#[test]
fn prometheus() {
    // Nodes 1 and 2 are peers, Node 2 is the parent of Node 2
    install_tracing_subscriber();
    let context = TestContext::new();

    // Each node will use its own DB within the redis instance
    let mut connection_info1 = context.get_client_connection_info();
    connection_info1.db = 1;
    let mut connection_info2 = context.get_client_connection_info();
    connection_info2.db = 2;

    let node_a_http = get_open_port(Some(3010));
    let node_a_settlement = get_open_port(Some(3011));
    let node_b_http = get_open_port(Some(3020));
    let node_b_settlement = get_open_port(Some(3021));
    let prometheus_port = get_open_port(None);

    let mut runtime = RuntimeBuilder::new()
        .panic_handler(|err| std::panic::resume_unwind(err))
        .build()
        .unwrap();

    let alice_on_a = json!({
        "ilp_address": "example.node_a.alice",
        "username": "alice_on_a",
        "asset_code": "XYZ",
        "asset_scale": 9,
        "ilp_over_http_incoming_token" : "token",
    });
    let b_on_a = json!({
        "ilp_address": "example.node_b",
        "username": "node_b",
        "asset_code": "XYZ",
        "asset_scale": 9,
        "ilp_over_http_url": format!("http://localhost:{}/ilp", node_b_http),
        "ilp_over_http_incoming_token" : "token",
        "ilp_over_http_outgoing_token" : "node_a:token",
        "routing_relation": "Peer",
    });

    let a_on_b = json!({
        "ilp_address": "example.node_a",
        "username": "node_a",
        "asset_code": "XYZ",
        "asset_scale": 9,
        "ilp_over_http_url": format!("http://localhost:{}/ilp", node_a_http),
        "ilp_over_http_incoming_token" : "token",
        "ilp_over_http_outgoing_token" : "node_b:token",
        "routing_relation": "Peer",
    });

    let bob_on_b = json!({
        "ilp_address": "example.node_b.bob",
        "username": "bob_on_b",
        "asset_code": "XYZ",
        "asset_scale": 6,
        "ilp_over_http_incoming_token" : "token",
    });

    let node_a: InterledgerNode = serde_json::from_value(json!({
        "admin_auth_token": "admin",
        "redis_connection": connection_info_to_string(connection_info1),
        "http_bind_address": format!("127.0.0.1:{}", node_a_http),
        "settlement_api_bind_address": format!("127.0.0.1:{}", node_a_settlement),
        "secret_seed": random_secret(),
        "prometheus": {
            "bind_address": format!("127.0.0.1:{}", prometheus_port),
            "histogram_window": 10000,
            "histogram_granularity": 1000,
        }
    }))
    .unwrap();

    let node_b: InterledgerNode = serde_json::from_value(json!({
        "ilp_address": "example.parent",
        "default_spsp_account": "bob_on_b",
        "admin_auth_token": "admin",
        "redis_connection": connection_info_to_string(connection_info2),
        "http_bind_address": format!("127.0.0.1:{}", node_b_http),
        "settlement_api_bind_address": format!("127.0.0.1:{}", node_b_settlement),
        "secret_seed": random_secret(),
    }))
    .unwrap();

    let alice_fut = join_all(vec![
        create_account_on_node(node_a_http, alice_on_a, "admin"),
        create_account_on_node(node_a_http, b_on_a, "admin"),
    ]);

    runtime.spawn(node_a.serve());

    let bob_fut = join_all(vec![
        create_account_on_node(node_b_http, a_on_b, "admin"),
        create_account_on_node(node_b_http, bob_on_b, "admin"),
    ]);

    runtime.spawn(node_b.serve());

    runtime
        .block_on(
            // Wait for the nodes to spin up
            delay(500)
                .map_err(|_| panic!("Something strange happened"))
                .and_then(move |_| {
                    bob_fut
                        .and_then(|_| alice_fut)
                        .and_then(|_| delay(500).map_err(|_| panic!("delay error")))
                })
                .and_then(move |_| {
                    let send_1_to_2 = send_money_to_username(
                        node_a_http,
                        node_b_http,
                        1000,
                        "bob_on_b",
                        "alice_on_a",
                        "token",
                    );
                    let send_2_to_1 = send_money_to_username(
                        node_b_http,
                        node_a_http,
                        2000,
                        "alice_on_a",
                        "bob_on_b",
                        "token",
                    );

                    let check_metrics = move || {
                        Client::new()
                            .get(&format!("http://127.0.0.1:{}", prometheus_port))
                            .send()
                            .map_err(|err| eprintln!("Error getting metrics {:?}", err))
                            .and_then(|mut res| {
                                res.text().map_err(|err| {
                                    eprintln!("Response was not a string: {:?}", err)
                                })
                            })
                    };

                    send_1_to_2
                        .map_err(|err| {
                            eprintln!("Error sending from node 1 to node 2: {:?}", err);
                            err
                        })
                        .and_then(move |_| {
                            check_metrics().and_then(move |ret| {
                                assert!(ret.starts_with("# metrics snapshot"));
                                assert!(ret.contains("requests_incoming_fulfill"));
                                assert!(ret.contains("requests_incoming_prepare"));
                                assert!(ret.contains("requests_incoming_reject"));
                                assert!(ret.contains("requests_incoming_duration"));
                                assert!(ret.contains("requests_outgoing_fulfill"));
                                assert!(ret.contains("requests_outgoing_prepare"));
                                assert!(ret.contains("requests_outgoing_reject"));
                                assert!(ret.contains("requests_outgoing_duration"));
                                // TODO check the specific numbers of packets
                                Ok(())
                            })
                        })
                        .and_then(move |_| {
                            send_2_to_1.map_err(|err| {
                                eprintln!("Error sending from node 2 to node 1: {:?}", err);
                                err
                            })
                        })
                        .and_then(move |_| {
                            check_metrics().and_then(move |ret| {
                                assert!(ret.starts_with("# metrics snapshot"));
                                assert!(ret.contains("requests_incoming_fulfill"));
                                assert!(ret.contains("requests_incoming_prepare"));
                                assert!(ret.contains("requests_incoming_reject"));
                                assert!(ret.contains("requests_incoming_duration"));
                                assert!(ret.contains("requests_outgoing_fulfill"));
                                assert!(ret.contains("requests_outgoing_prepare"));
                                assert!(ret.contains("requests_outgoing_reject"));
                                assert!(ret.contains("requests_outgoing_duration"));
                                // TODO check the specific numbers of packets
                                Ok(())
                            })
                        })
                }),
        )
        .unwrap();
}