use crate::{
make_config,
utils::{
check_for_line_with, kill_homestar, listen_addr, multiaddr, retrieve_output,
subscribe_network_events, wait_for_socket_connection, ChildGuard, ProcInfo,
TimeoutFutureExt, BIN_NAME, ED25519MULTIHASH, SECP256K1MULTIHASH,
},
};
use anyhow::Result;
use once_cell::sync::Lazy;
use std::{
path::PathBuf,
process::{Command, Stdio},
time::Duration,
};
static BIN: Lazy<PathBuf> = Lazy::new(|| assert_cmd::cargo::cargo_bin(BIN_NAME));
#[test]
#[serial_test::parallel]
fn test_connection_notifications_integration() -> Result<()> {
let proc_info1 = ProcInfo::new().unwrap();
let proc_info2 = ProcInfo::new().unwrap();
let rpc_port1 = proc_info1.rpc_port;
let rpc_port2 = proc_info2.rpc_port;
let metrics_port1 = proc_info1.metrics_port;
let metrics_port2 = proc_info2.metrics_port;
let ws_port1 = proc_info1.ws_port;
let ws_port2 = proc_info2.ws_port;
let listen_addr1 = listen_addr(proc_info1.listen_port);
let listen_addr2 = listen_addr(proc_info2.listen_port);
let node_addra = multiaddr(proc_info1.listen_port, ED25519MULTIHASH);
let node_addrb = multiaddr(proc_info2.listen_port, SECP256K1MULTIHASH);
let toml = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }}
[node.network.libp2p]
listen_address = "{listen_addr1}"
node_addresses = ["{node_addrb}"]
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port1}
[node.network.rpc]
port = {rpc_port1}
[node.network.webserver]
port = {ws_port1}
"#
);
let config1 = make_config!(toml);
let homestar_proc1 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config1.filename())
.arg("--db")
.arg(&proc_info1.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard1 = ChildGuard::new(homestar_proc1);
if wait_for_socket_connection(ws_port1, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
tokio_test::block_on(async {
let mut net_events1 = subscribe_network_events(ws_port1).await;
let sub1 = net_events1.sub();
let toml2 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" }}
[node.network.libp2p]
listen_address = "{listen_addr2}"
node_addresses = ["{node_addra}"]
[node.network.libp2p.mdns]
enable = false
[node.network.metrics]
port = {metrics_port2}
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.rpc]
port = {rpc_port2}
[node.network.webserver]
port = {ws_port2}
"#
);
let config2 = make_config!(toml2);
let homestar_proc2 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config2.filename())
.arg("--db")
.arg(&proc_info2.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard2 = ChildGuard::new(homestar_proc2);
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
break;
}
} else {
panic!("Node one did not receive a connection established message in time.")
}
}
let dead_proc2 = kill_homestar(proc_guard2.take(), None);
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_closed"].is_object() {
break;
}
} else {
panic!("Node one did not receive a connection closed message in time.")
}
}
let dead_proc1 = kill_homestar(proc_guard1.take(), None);
let stdout1 = retrieve_output(dead_proc1);
let stdout2 = retrieve_output(dead_proc2);
let two_added_to_dht = check_for_line_with(
stdout1.clone(),
vec![
"added configured node to kademlia routing table",
SECP256K1MULTIHASH,
],
);
let two_in_dht_routing_table = check_for_line_with(
stdout1.clone(),
vec![
"kademlia routing table updated with peer",
SECP256K1MULTIHASH,
],
);
let one_connected_to_two = check_for_line_with(
stdout1.clone(),
vec!["peer connection established", SECP256K1MULTIHASH],
);
let two_disconnected_from_one = check_for_line_with(
stdout1.clone(),
vec!["peer connection closed", SECP256K1MULTIHASH],
);
let two_removed_from_dht_table = check_for_line_with(
stdout1.clone(),
vec!["removed peer from kademlia table", SECP256K1MULTIHASH],
);
assert!(one_connected_to_two);
assert!(two_in_dht_routing_table);
assert!(two_added_to_dht);
assert!(two_disconnected_from_one);
assert!(!two_removed_from_dht_table);
let one_addded_to_dht = check_for_line_with(
stdout2.clone(),
vec![
"added configured node to kademlia routing table",
ED25519MULTIHASH,
],
);
let one_in_dht_routing_table = check_for_line_with(
stdout2.clone(),
vec!["kademlia routing table updated with peer", ED25519MULTIHASH],
);
let two_connected_to_one = check_for_line_with(
stdout2,
vec!["peer connection established", ED25519MULTIHASH],
);
assert!(one_addded_to_dht);
assert!(one_in_dht_routing_table);
assert!(two_connected_to_one);
});
Ok(())
}
#[test]
#[serial_test::parallel]
fn test_libp2p_redial_on_connection_closed_integration() -> Result<()> {
let proc_info1 = ProcInfo::new().unwrap();
let proc_info2 = ProcInfo::new().unwrap();
let rpc_port1 = proc_info1.rpc_port;
let rpc_port2 = proc_info2.rpc_port;
let metrics_port1 = proc_info1.metrics_port;
let metrics_port2 = proc_info2.metrics_port;
let ws_port1 = proc_info1.ws_port;
let ws_port2 = proc_info2.ws_port;
let listen_addr1 = listen_addr(proc_info1.listen_port);
let listen_addr2 = listen_addr(proc_info2.listen_port);
let node_addrb = multiaddr(proc_info2.listen_port, SECP256K1MULTIHASH);
let toml1 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }}
[node.network.libp2p]
listen_address = "{listen_addr1}"
node_addresses = ["{node_addrb}"]
dial_interval = 3
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port1}
[node.network.rpc]
port = {rpc_port1}
[node.network.webserver]
port = {ws_port1}
"#
);
let toml2 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" }}
[node.network.libp2p]
listen_address = "{listen_addr2}"
node_addresses = []
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port2}
[node.network.rpc]
port = {rpc_port2}
[node.network.webserver]
port = {ws_port2}
"#
);
let config1 = make_config!(toml1);
let config2 = make_config!(toml2);
let homestar_proc1 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config1.filename())
.arg("--db")
.arg(&proc_info1.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let _proc_guard1 = ChildGuard::new(homestar_proc1);
if wait_for_socket_connection(ws_port1, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
tokio_test::block_on(async {
let mut net_events1 = subscribe_network_events(ws_port1).await;
let sub1 = net_events1.sub();
let homestar_proc2 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config2.filename())
.arg("--db")
.arg(&proc_info2.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard2 = ChildGuard::new(homestar_proc2);
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
break;
}
} else {
panic!("Node one did not establish a connection with node two in time.")
}
}
kill_homestar(proc_guard2.take(), None);
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_closed"].is_object() {
break;
}
} else {
panic!("Connection between node one and node two did not close in time.")
}
}
let homestar_proc2 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config2.filename())
.arg("--db")
.arg(&proc_info2.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let _proc_guard2 = ChildGuard::new(homestar_proc2);
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
break;
}
} else {
panic!("Node one did not redial node two in time.")
}
}
});
Ok(())
}
#[test]
#[serial_test::parallel]
fn test_libp2p_redial_on_connection_error_integration() -> Result<()> {
let proc_info1 = ProcInfo::new().unwrap();
let proc_info2 = ProcInfo::new().unwrap();
let rpc_port1 = proc_info1.rpc_port;
let rpc_port2 = proc_info2.rpc_port;
let metrics_port1 = proc_info1.metrics_port;
let metrics_port2 = proc_info2.metrics_port;
let ws_port1 = proc_info1.ws_port;
let ws_port2 = proc_info2.ws_port;
let listen_addr1 = listen_addr(proc_info1.listen_port);
let listen_addr2 = listen_addr(proc_info2.listen_port);
let node_addrb = multiaddr(proc_info2.listen_port, SECP256K1MULTIHASH);
let toml1 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }}
[node.network.libp2p]
listen_address = "{listen_addr1}"
node_addresses = ["{node_addrb}"]
dial_interval = 3
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port1}
[node.network.rpc]
port = {rpc_port1}
[node.network.webserver]
port = {ws_port1}
"#
);
let toml2 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" }}
[node.network.libp2p]
listen_address = "{listen_addr2}"
node_addresses = []
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port2}
[node.network.rpc]
port = {rpc_port2}
[node.network.webserver]
port = {ws_port2}
"#
);
let config1 = make_config!(toml1);
let config2 = make_config!(toml2);
let homestar_proc1 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config1.filename())
.arg("--db")
.arg(&proc_info1.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let _proc_guard1 = ChildGuard::new(homestar_proc1);
if wait_for_socket_connection(ws_port1, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
tokio_test::block_on(async {
let mut net_events1 = subscribe_network_events(ws_port1).await;
let sub1 = net_events1.sub();
let homestar_proc2 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config2.filename())
.arg("--db")
.arg(&proc_info2.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard2 = ChildGuard::new(homestar_proc2);
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
break;
}
} else {
panic!("Node one did not establish a connection with node two in time.")
}
}
kill_homestar(proc_guard2.take(), None);
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_closed"].is_object() {
break;
}
} else {
panic!("Connection between node one and node two did not close in time.")
}
}
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["outgoing_connection_error"].is_object() {
break;
}
} else {
panic!("Connection between node one and node two did not close in time.")
}
}
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["outgoing_connection_error"].is_object() {
break;
}
} else {
panic!("Connection between node one and node two did not close in time.")
}
}
let homestar_proc2 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config2.filename())
.arg("--db")
.arg(&proc_info2.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let _proc_guard2 = ChildGuard::new(homestar_proc2);
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
break;
}
} else {
panic!("Node one did not redial node two in time.")
}
}
});
Ok(())
}