use futures::{future::join_all, stream::*, sync::mpsc, Future};
use ilp_node::InterledgerNode;
use serde_json::json;
use tokio::runtime::Builder as RuntimeBuilder;
use tracing::{debug, error_span};
use tracing_futures::Instrument;
mod redis_helpers;
use redis_helpers::*;
mod test_helpers;
use interledger::packet::Address;
use interledger::stream::StreamDelivery;
use std::str::FromStr;
use test_helpers::*;
const LOG_TARGET: &str = "interledger-tests-three-nodes";
#[test]
fn three_nodes() {
install_tracing_subscriber();
let context = TestContext::new();
let mut connection_info1 = context.get_client_connection_info();
connection_info1.db = 1;
let mut connection_info2 = context.get_client_connection_info();
connection_info2.db = 2;
let mut connection_info3 = context.get_client_connection_info();
connection_info3.db = 3;
let node1_http = get_open_port(Some(3010));
let node1_settlement = get_open_port(Some(3011));
let node2_http = get_open_port(Some(3020));
let node2_settlement = get_open_port(Some(3021));
let node3_http = get_open_port(Some(3030));
let node3_settlement = get_open_port(Some(3031));
let mut runtime = RuntimeBuilder::new()
.panic_handler(|err| std::panic::resume_unwind(err))
.build()
.unwrap();
let alice_on_alice = json!({
"ilp_address": "example.alice",
"username": "alice_on_a",
"asset_code": "XYZ",
"asset_scale": 9,
"ilp_over_http_incoming_token" : "default account holder",
});
let bob_on_alice = json!({
"ilp_address": "example.bob",
"username": "bob_on_a",
"asset_code": "XYZ",
"asset_scale": 9,
"ilp_over_http_url": format!("http://localhost:{}/ilp", node2_http),
"ilp_over_http_incoming_token" : "two",
"ilp_over_http_outgoing_token" : "alice_on_b:one",
"min_balance": -1_000_000_000,
"routing_relation": "Peer",
});
let alice_on_bob = json!({
"ilp_address": "example.alice",
"username": "alice_on_b",
"asset_code": "XYZ",
"asset_scale": 9,
"ilp_over_http_url": format!("http://localhost:{}/ilp", node1_http),
"ilp_over_http_incoming_token" : "one",
"ilp_over_http_outgoing_token" : "bob_on_a:two",
"routing_relation": "Peer",
});
let charlie_on_bob = json!({
"username": "charlie_on_b",
"asset_code": "ABC",
"asset_scale": 6,
"ilp_over_btp_incoming_token" : "three",
"ilp_over_http_incoming_token" : "three",
"min_balance": -1_000_000_000,
"routing_relation": "Child",
});
let charlie_on_charlie = json!({
"username": "charlie_on_c",
"asset_code": "ABC",
"asset_scale": 6,
"ilp_over_http_incoming_token" : "default account holder",
});
let bob_on_charlie = json!({
"ilp_address": "example.bob",
"username": "bob_on_c",
"asset_code": "ABC",
"asset_scale": 6,
"ilp_over_http_incoming_token" : "two",
"ilp_over_http_outgoing_token": "charlie_on_b:three",
"ilp_over_http_url": format!("http://localhost:{}/ilp", node2_http),
"ilp_over_btp_url": format!("btp+ws://localhost:{}/ilp/btp", node2_http),
"ilp_over_btp_outgoing_token": "charlie_on_b:three",
"min_balance": -1_000_000_000,
"routing_relation": "Parent",
});
let node1: InterledgerNode = serde_json::from_value(json!({
"ilp_address": "example.alice",
"default_spsp_account": "alice_on_a",
"admin_auth_token": "admin",
"redis_connection": connection_info_to_string(connection_info1),
"http_bind_address": format!("127.0.0.1:{}", node1_http),
"settlement_api_bind_address": format!("127.0.0.1:{}", node1_settlement),
"secret_seed": random_secret(),
"route_broadcast_interval": Some(200),
"exchange_rate_poll_interval": 60000,
}))
.expect("Error creating node1.");
let node2: InterledgerNode = serde_json::from_value(json!({
"ilp_address": "example.bob",
"admin_auth_token": "admin",
"redis_connection": connection_info_to_string(connection_info2),
"http_bind_address": format!("127.0.0.1:{}", node2_http),
"settlement_api_bind_address": format!("127.0.0.1:{}", node2_settlement),
"secret_seed": random_secret(),
"route_broadcast_interval": Some(200),
"exchange_rate_poll_interval": 60000,
}))
.expect("Error creating node2.");
let node3: InterledgerNode = serde_json::from_value(json!({
"default_spsp_account": "charlie_on_c",
"admin_auth_token": "admin",
"redis_connection": connection_info_to_string(connection_info3),
"http_bind_address": format!("127.0.0.1:{}", node3_http),
"settlement_api_bind_address": format!("127.0.0.1:{}", node3_settlement),
"secret_seed": random_secret(),
"route_broadcast_interval": Some(200),
"exchange_rate_poll_interval": 60000,
}))
.expect("Error creating node3.");
let (finish_sender, finish_receiver) = mpsc::channel(0);
let alice_fut = join_all(vec![
create_account_on_node(node1_http, alice_on_alice, "admin"),
create_account_on_node(node1_http, bob_on_alice, "admin"),
]);
let mut node1_finish_sender = finish_sender.clone();
runtime.spawn(
node1
.serve()
.and_then(move |_| alice_fut)
.and_then(move |_| {
node1_finish_sender
.try_send(1)
.expect("Could not send message from node_1");
Ok(())
})
.instrument(error_span!(target: "interledger", "node1")),
);
let bob_fut = join_all(vec![
create_account_on_node(node2_http, alice_on_bob, "admin"),
create_account_on_node(node2_http, charlie_on_bob, "admin"),
]);
let mut node2_finish_sender = finish_sender;
runtime.spawn(
node2
.serve()
.and_then(move |_| bob_fut)
.and_then(move |_| {
let client = reqwest::r#async::Client::new();
client
.put(&format!("http://localhost:{}/rates", node2_http))
.header("Authorization", "Bearer admin")
.json(&json!({"ABC": 1, "XYZ": 2}))
.send()
.map_err(|err| panic!(err))
.and_then(move |res| {
res.error_for_status()
.expect("Error setting exchange rates");
node2_finish_sender
.try_send(2)
.expect("Could not send message from node_2");
Ok(())
})
})
.instrument(error_span!(target: "interledger", "node2")),
);
let charlie_fut = create_account_on_node(node3_http, charlie_on_charlie, "admin")
.and_then(move |_| create_account_on_node(node3_http, bob_on_charlie, "admin"));
runtime
.block_on(
node3
.serve()
.and_then(move |_| finish_receiver.collect())
.and_then(move |messages| {
debug!(
target: LOG_TARGET,
"Received finish messages: {:?}", messages
);
charlie_fut
})
.instrument(error_span!(target: "interledger", "node3"))
.and_then(move |_| {
delay(1000).map_err(|_| panic!("Something strange happened when `delay`"))
})
.and_then(move |_| {
let send_1_to_3 = send_money_to_username(
node1_http,
node3_http,
1000,
"charlie_on_c",
"alice_on_a",
"default account holder",
);
let send_3_to_1 = send_money_to_username(
node3_http,
node1_http,
1000,
"alice_on_a",
"charlie_on_c",
"default account holder",
);
let get_balances = move || {
futures::future::join_all(vec![
get_balance("alice_on_a", node1_http, "admin"),
get_balance("charlie_on_b", node2_http, "admin"),
get_balance("charlie_on_c", node3_http, "admin"),
])
};
send_1_to_3
.map_err(|err| {
eprintln!("Error sending from node 1 to node 3: {:?}", err);
err
})
.and_then(move |receipt: StreamDelivery| {
debug!(target: LOG_TARGET, "send_1_to_3 receipt: {:?}", receipt);
assert_eq!(
receipt.from,
Address::from_str("example.alice").unwrap(),
"Payment receipt incorrect (1)"
);
assert!(receipt
.to
.to_string()
.starts_with("example.bob.charlie_on_b.charlie_on_c."));
assert_eq!(receipt.sent_asset_code, "XYZ");
assert_eq!(receipt.sent_asset_scale, 9);
assert_eq!(receipt.sent_amount, 1000);
assert_eq!(receipt.delivered_asset_code.unwrap(), "ABC");
assert_eq!(receipt.delivered_amount, 2);
assert_eq!(receipt.delivered_asset_scale.unwrap(), 6);
get_balances().and_then(move |ret| {
assert_eq!(ret[0], -1000);
assert_eq!(ret[1], 2);
assert_eq!(ret[2], 2);
Ok(())
})
})
.and_then(move |_| {
send_3_to_1.map_err(|err| {
eprintln!("Error sending from node 3 to node 1: {:?}", err);
err
})
})
.and_then(move |receipt| {
debug!(target: LOG_TARGET, "send_3_to_1 receipt: {:?}", receipt);
assert_eq!(
receipt.from,
Address::from_str("example.bob.charlie_on_b.charlie_on_c").unwrap(),
"Payment receipt incorrect (2)"
);
assert!(receipt.to.to_string().starts_with("example.alice"));
assert_eq!(receipt.sent_asset_code, "ABC");
assert_eq!(receipt.sent_asset_scale, 6);
assert_eq!(receipt.sent_amount, 1000);
assert_eq!(receipt.delivered_asset_code.unwrap(), "XYZ");
assert_eq!(receipt.delivered_amount, 500_000);
assert_eq!(receipt.delivered_asset_scale.unwrap(), 9);
get_balances().and_then(move |ret| {
assert_eq!(ret[0], 499_000);
assert_eq!(ret[1], -998);
assert_eq!(ret[2], -998);
Ok(())
})
})
}),
)
.map_err(|err| {
eprintln!("Error executing tests: {:?}", err);
err
})
.unwrap();
}