use futures::{future::join_all, Future};
use ilp_node::InterledgerNode;
use reqwest::r#async::Client;
use serde_json::{self, json};
use tokio::runtime::Builder as RuntimeBuilder;
mod redis_helpers;
use redis_helpers::*;
mod test_helpers;
use test_helpers::*;
#[test]
fn prometheus() {
install_tracing_subscriber();
let context = TestContext::new();
let mut connection_info1 = context.get_client_connection_info();
connection_info1.db = 1;
let mut connection_info2 = context.get_client_connection_info();
connection_info2.db = 2;
let node_a_http = get_open_port(Some(3010));
let node_a_settlement = get_open_port(Some(3011));
let node_b_http = get_open_port(Some(3020));
let node_b_settlement = get_open_port(Some(3021));
let prometheus_port = get_open_port(None);
let mut runtime = RuntimeBuilder::new()
.panic_handler(|err| std::panic::resume_unwind(err))
.build()
.unwrap();
let alice_on_a = json!({
"ilp_address": "example.node_a.alice",
"username": "alice_on_a",
"asset_code": "XYZ",
"asset_scale": 9,
"ilp_over_http_incoming_token" : "token",
});
let b_on_a = json!({
"ilp_address": "example.node_b",
"username": "node_b",
"asset_code": "XYZ",
"asset_scale": 9,
"ilp_over_http_url": format!("http://localhost:{}/ilp", node_b_http),
"ilp_over_http_incoming_token" : "token",
"ilp_over_http_outgoing_token" : "node_a:token",
"routing_relation": "Peer",
});
let a_on_b = json!({
"ilp_address": "example.node_a",
"username": "node_a",
"asset_code": "XYZ",
"asset_scale": 9,
"ilp_over_http_url": format!("http://localhost:{}/ilp", node_a_http),
"ilp_over_http_incoming_token" : "token",
"ilp_over_http_outgoing_token" : "node_b:token",
"routing_relation": "Peer",
});
let bob_on_b = json!({
"ilp_address": "example.node_b.bob",
"username": "bob_on_b",
"asset_code": "XYZ",
"asset_scale": 6,
"ilp_over_http_incoming_token" : "token",
});
let node_a: InterledgerNode = serde_json::from_value(json!({
"admin_auth_token": "admin",
"redis_connection": connection_info_to_string(connection_info1),
"http_bind_address": format!("127.0.0.1:{}", node_a_http),
"settlement_api_bind_address": format!("127.0.0.1:{}", node_a_settlement),
"secret_seed": random_secret(),
"prometheus": {
"bind_address": format!("127.0.0.1:{}", prometheus_port),
"histogram_window": 10000,
"histogram_granularity": 1000,
}
}))
.unwrap();
let node_b: InterledgerNode = serde_json::from_value(json!({
"ilp_address": "example.parent",
"default_spsp_account": "bob_on_b",
"admin_auth_token": "admin",
"redis_connection": connection_info_to_string(connection_info2),
"http_bind_address": format!("127.0.0.1:{}", node_b_http),
"settlement_api_bind_address": format!("127.0.0.1:{}", node_b_settlement),
"secret_seed": random_secret(),
}))
.unwrap();
let alice_fut = join_all(vec![
create_account_on_node(node_a_http, alice_on_a, "admin"),
create_account_on_node(node_a_http, b_on_a, "admin"),
]);
runtime.spawn(node_a.serve());
let bob_fut = join_all(vec![
create_account_on_node(node_b_http, a_on_b, "admin"),
create_account_on_node(node_b_http, bob_on_b, "admin"),
]);
runtime.spawn(node_b.serve());
runtime
.block_on(
delay(500)
.map_err(|_| panic!("Something strange happened"))
.and_then(move |_| {
bob_fut
.and_then(|_| alice_fut)
.and_then(|_| delay(500).map_err(|_| panic!("delay error")))
})
.and_then(move |_| {
let send_1_to_2 = send_money_to_username(
node_a_http,
node_b_http,
1000,
"bob_on_b",
"alice_on_a",
"token",
);
let send_2_to_1 = send_money_to_username(
node_b_http,
node_a_http,
2000,
"alice_on_a",
"bob_on_b",
"token",
);
let check_metrics = move || {
Client::new()
.get(&format!("http://127.0.0.1:{}", prometheus_port))
.send()
.map_err(|err| eprintln!("Error getting metrics {:?}", err))
.and_then(|mut res| {
res.text().map_err(|err| {
eprintln!("Response was not a string: {:?}", err)
})
})
};
send_1_to_2
.map_err(|err| {
eprintln!("Error sending from node 1 to node 2: {:?}", err);
err
})
.and_then(move |_| {
check_metrics().and_then(move |ret| {
assert!(ret.starts_with("# metrics snapshot"));
assert!(ret.contains("requests_incoming_fulfill"));
assert!(ret.contains("requests_incoming_prepare"));
assert!(ret.contains("requests_incoming_reject"));
assert!(ret.contains("requests_incoming_duration"));
assert!(ret.contains("requests_outgoing_fulfill"));
assert!(ret.contains("requests_outgoing_prepare"));
assert!(ret.contains("requests_outgoing_reject"));
assert!(ret.contains("requests_outgoing_duration"));
Ok(())
})
})
.and_then(move |_| {
send_2_to_1.map_err(|err| {
eprintln!("Error sending from node 2 to node 1: {:?}", err);
err
})
})
.and_then(move |_| {
check_metrics().and_then(move |ret| {
assert!(ret.starts_with("# metrics snapshot"));
assert!(ret.contains("requests_incoming_fulfill"));
assert!(ret.contains("requests_incoming_prepare"));
assert!(ret.contains("requests_incoming_reject"));
assert!(ret.contains("requests_incoming_duration"));
assert!(ret.contains("requests_outgoing_fulfill"));
assert!(ret.contains("requests_outgoing_prepare"));
assert!(ret.contains("requests_outgoing_reject"));
assert!(ret.contains("requests_outgoing_duration"));
Ok(())
})
})
}),
)
.unwrap();
}