use crate::{
make_config,
utils::{
check_for_line_with, kill_homestar, listen_addr, multiaddr, retrieve_output,
subscribe_network_events, wait_for_socket_connection, ChildGuard, ProcInfo,
TimeoutFutureExt, BIN_NAME, ED25519MULTIHASH, ED25519MULTIHASH2, ED25519MULTIHASH3,
ED25519MULTIHASH5, SECP256K1MULTIHASH,
},
};
use anyhow::Result;
use diesel::RunQueryDsl;
use homestar_runtime::{
db::{self, schema, Database},
Db, Settings,
};
use libipld::Cid;
use once_cell::sync::Lazy;
use std::{
path::PathBuf,
process::{Command, Stdio},
str::FromStr,
time::Duration,
};
static BIN: Lazy<PathBuf> = Lazy::new(|| assert_cmd::cargo::cargo_bin(BIN_NAME));
#[test]
#[serial_test::parallel]
fn test_libp2p_dht_records_integration() -> Result<()> {
let proc_info1 = ProcInfo::new().unwrap();
let proc_info2 = ProcInfo::new().unwrap();
let rpc_port1 = proc_info1.rpc_port;
let rpc_port2 = proc_info2.rpc_port;
let metrics_port1 = proc_info1.metrics_port;
let metrics_port2 = proc_info2.metrics_port;
let ws_port1 = proc_info1.ws_port;
let ws_port2 = proc_info2.ws_port;
let listen_addr1 = listen_addr(proc_info1.listen_port);
let listen_addr2 = listen_addr(proc_info2.listen_port);
let node_addra = multiaddr(proc_info1.listen_port, ED25519MULTIHASH);
let node_addrb = multiaddr(proc_info2.listen_port, SECP256K1MULTIHASH);
let toml1 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }}
[node.network.libp2p]
listen_address = "{listen_addr1}"
node_addresses = ["{node_addrb}"]
[node.network.libp2p.dht]
p2p_receipt_timeout = 3000
p2p_workflow_info_timeout = 3000
receipt_quorum = 1
workflow_quorum = 1
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.pubsub]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port1}
[node.network.rpc]
port = {rpc_port1}
[node.network.webserver]
port = {ws_port1}
"#
);
let config1 = make_config!(toml1);
let homestar_proc1 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config1.filename())
.arg("--db")
.arg(&proc_info1.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard1 = ChildGuard::new(homestar_proc1);
if wait_for_socket_connection(ws_port1, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
tokio_test::block_on(async {
let mut net_events1 = subscribe_network_events(ws_port1).await;
let sub1 = net_events1.sub();
let toml2 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" }}
[node.network.libp2p]
listen_address = "{listen_addr2}"
node_addresses = ["{node_addra}"]
[node.network.libp2p.dht]
p2p_receipt_timeout = 3000
p2p_workflow_info_timeout = 3000
receipt_quorum = 1
workflow_quorum = 1
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.pubsub]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port2}
[node.network.rpc]
port = {rpc_port2}
[node.network.webserver]
port = {ws_port2}
"#
);
let config2 = make_config!(toml2);
let homestar_proc2 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config2.filename())
.arg("--db")
.arg(&proc_info2.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard2 = ChildGuard::new(homestar_proc2);
if wait_for_socket_connection(ws_port2, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
let mut net_events2 = subscribe_network_events(ws_port2).await;
let sub2 = net_events2.sub();
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
break;
}
} else {
panic!("Node one did not establish a connection with node two in time.")
}
}
let _ = Command::new(BIN.as_os_str())
.arg("run")
.arg("-p")
.arg(rpc_port1.to_string())
.arg("tests/fixtures/test-workflow-add-one-part-one.json")
.output();
let mut put_receipt_cid: Cid = Cid::default();
let mut put_receipt = false;
let mut put_workflow_info = false;
let mut receipt_quorum_success = false;
let mut workflow_info_quorum_success = false;
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["put_receipt_dht"].is_object() {
put_receipt_cid =
Cid::from_str(json["put_receipt_dht"]["cid"].as_str().unwrap())
.expect("Unable to parse put receipt CID.");
put_receipt = true;
} else if json["put_workflow_info_dht"].is_object() {
put_workflow_info = true;
} else if json["receipt_quorum_success_dht"].is_object() {
receipt_quorum_success = true;
} else if json["workflow_info_quorum_success_dht"].is_object() {
workflow_info_quorum_success = true;
}
} else {
panic!(
r#"Expected notifications from node one did not arrive in time:
- Put receipt to DHT: {}
- Put workflow info to DHT: {}
- Receipt quorum succeeded: {}
- Workflow info quorum succeeded: {}
"#,
put_receipt,
put_workflow_info,
receipt_quorum_success,
workflow_info_quorum_success
);
}
if put_receipt
&& put_workflow_info
&& receipt_quorum_success
&& workflow_info_quorum_success
{
break;
}
}
let _ = Command::new(BIN.as_os_str())
.arg("run")
.arg("-p")
.arg(rpc_port2.to_string())
.arg("tests/fixtures/test-workflow-add-one-part-two.json")
.output();
let config_fixture = config2.filename();
let settings = Settings::load_from_file(PathBuf::from(config_fixture)).unwrap();
let db = Db::setup_connection_pool(
settings.node(),
Some(proc_info2.db_path.display().to_string()),
)
.expect("Failed to connect to node two database");
let _ = Command::new(BIN.as_os_str())
.arg("run")
.arg("-p")
.arg(rpc_port2.to_string())
.arg("tests/fixtures/test-workflow-add-one-part-one.json")
.output();
let received_workflow_info_cid: Cid;
loop {
if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["got_workflow_info_dht"].is_object() {
received_workflow_info_cid =
Cid::from_str(json["got_workflow_info_dht"]["cid"].as_str().unwrap())
.expect("Unable to parse received workflow info CID.");
break;
}
} else {
panic!("Node two did not get workflow info in time.")
}
}
let stored_workflow_info =
Db::get_workflow_info(received_workflow_info_cid, &mut db.conn().unwrap());
assert!(stored_workflow_info.is_ok());
let dead_proc1 = kill_homestar(proc_guard1.take(), None);
let dead_proc2 = kill_homestar(proc_guard2.take(), None);
let stdout1 = retrieve_output(dead_proc1);
let stdout2 = retrieve_output(dead_proc2);
let put_receipt_logged = check_for_line_with(stdout1.clone(), vec!["receipt PUT onto DHT"]);
let put_workflow_info_logged =
check_for_line_with(stdout1.clone(), vec!["workflow info PUT onto DHT"]);
let receipt_quorum_success_logged =
check_for_line_with(stdout1.clone(), vec!["quorum success for receipt record"]);
let workflow_info_quorum_success_logged =
check_for_line_with(stdout1, vec!["quorum success for workflow info record"]);
assert!(put_receipt_logged);
assert!(put_workflow_info_logged);
assert!(receipt_quorum_success_logged);
assert!(workflow_info_quorum_success_logged);
let retrieved_workflow_info_logged = check_for_line_with(
stdout2.clone(),
vec!["found workflow info", ED25519MULTIHASH],
);
let retrieved_receipt_info_logged = check_for_line_with(
stdout2.clone(),
vec!["found receipt record", ED25519MULTIHASH],
);
let committed_receipt = check_for_line_with(
stdout2,
vec![
"committed to database",
"dht.resolver",
&put_receipt_cid.to_string(),
],
);
assert!(retrieved_workflow_info_logged);
assert!(retrieved_receipt_info_logged);
assert!(committed_receipt);
let stored_receipt = Db::find_receipt_by_cid(put_receipt_cid, &mut db.conn().unwrap());
assert!(stored_receipt.is_ok());
});
Ok(())
}
#[test]
#[serial_test::parallel]
fn test_libp2p_dht_quorum_failure_intregration() -> Result<()> {
let proc_info1 = ProcInfo::new().unwrap();
let proc_info2 = ProcInfo::new().unwrap();
let rpc_port1 = proc_info1.rpc_port;
let rpc_port2 = proc_info2.rpc_port;
let metrics_port1 = proc_info1.metrics_port;
let metrics_port2 = proc_info2.metrics_port;
let ws_port1 = proc_info1.ws_port;
let ws_port2 = proc_info2.ws_port;
let listen_addr1 = listen_addr(proc_info1.listen_port);
let listen_addr2 = listen_addr(proc_info2.listen_port);
let node_addra = multiaddr(proc_info2.listen_port, ED25519MULTIHASH3);
let node_addrb = multiaddr(proc_info1.listen_port, ED25519MULTIHASH2);
let toml1 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519_2.pem" }}
[node.network.libp2p]
listen_address = "{listen_addr1}"
node_addresses = ["{node_addra}"]
[node.network.libp2p.dht]
receipt_quorum = 100
workflow_quorum = 100
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.pubsub]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port1}
[node.network.rpc]
port = {rpc_port1}
[node.network.webserver]
port = {ws_port1}
"#
);
let config1 = make_config!(toml1);
let homestar_proc1 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config1.filename())
.arg("--db")
.arg(&proc_info1.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard1 = ChildGuard::new(homestar_proc1);
if wait_for_socket_connection(ws_port1, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
tokio_test::block_on(async {
let mut net_events1 = subscribe_network_events(ws_port1).await;
let sub1 = net_events1.sub();
let toml2 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519_3.pem" }}
[node.network.libp2p]
listen_address = "{listen_addr2}"
node_addresses = ["{node_addrb}"]
[node.network.libp2p.dht]
receipt_quorum = 100
workflow_quorum = 100
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.pubsub]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port2}
[node.network.rpc]
port = {rpc_port2}
[node.network.webserver]
port = {ws_port2}
"#
);
let config2 = make_config!(toml2);
let homestar_proc2 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config2.filename())
.arg("--db")
.arg(&proc_info2.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard2 = ChildGuard::new(homestar_proc2);
if wait_for_socket_connection(ws_port2, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
break;
}
} else {
panic!("Node one did not establish a connection with node two in time.")
}
}
let _ = Command::new(BIN.as_os_str())
.arg("run")
.arg("-p")
.arg(rpc_port1.to_string())
.arg("tests/fixtures/test-workflow-add-one.json")
.output();
let mut receipt_quorum_failure = false;
let mut workflow_info_quorum_failure = false;
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["receipt_quorum_failure_dht"].is_object() {
receipt_quorum_failure = true
}
if json["receipt_quorum_failure_dht"].is_object() {
receipt_quorum_failure = true
} else if json["workflow_info_quorum_failure_dht"].is_object() {
workflow_info_quorum_failure = true
}
} else {
panic!(
r#"Expected notifications from node one did not arrive in time:
- Receipt quorum failure: {}
- Workflow info failure: {}
"#,
receipt_quorum_failure, workflow_info_quorum_failure
);
}
if receipt_quorum_failure && workflow_info_quorum_failure {
break;
}
}
let dead_proc1 = kill_homestar(proc_guard1.take(), None);
let _dead_proc2 = kill_homestar(proc_guard2.take(), None);
let stdout1 = retrieve_output(dead_proc1);
let receipt_quorum_failure_logged = check_for_line_with(
stdout1.clone(),
vec!["QuorumFailed", "error propagating receipt record"],
);
let workflow_info_quorum_failure_logged = check_for_line_with(
stdout1,
vec!["QuorumFailed", "error propagating workflow info record"],
);
assert!(receipt_quorum_failure_logged);
assert!(workflow_info_quorum_failure_logged);
});
Ok(())
}
#[test]
#[serial_test::parallel]
fn test_libp2p_dht_workflow_info_provider_integration() -> Result<()> {
let proc_info1 = ProcInfo::new().unwrap();
let proc_info2 = ProcInfo::new().unwrap();
let rpc_port1 = proc_info1.rpc_port;
let rpc_port2 = proc_info2.rpc_port;
let metrics_port1 = proc_info1.metrics_port;
let metrics_port2 = proc_info2.metrics_port;
let ws_port1 = proc_info1.ws_port;
let ws_port2 = proc_info2.ws_port;
let listen_addr1 = listen_addr(proc_info1.listen_port);
let listen_addr2 = listen_addr(proc_info2.listen_port);
let node_addra = multiaddr(proc_info1.listen_port, ED25519MULTIHASH2);
let node_addrb = multiaddr(proc_info2.listen_port, ED25519MULTIHASH5);
let toml1 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519_2.pem" }}
[node.network.libp2p]
idle_connection_timeout = 360
listen_address = "{listen_addr1}"
node_addresses = ["{node_addrb}"]
[node.network.libp2p.dht]
receipt_quorum = 1
workflow_quorum = 1
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.pubsub]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port1}
[node.network.rpc]
port = {rpc_port1}
[node.network.webserver]
port = {ws_port1}
"#
);
let config1 = make_config!(toml1);
let homestar_proc1 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config1.filename())
.arg("--db")
.arg(&proc_info1.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard1 = ChildGuard::new(homestar_proc1);
if wait_for_socket_connection(ws_port1, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
tokio_test::block_on(async {
let mut net_events1 = subscribe_network_events(ws_port1).await;
let sub1 = net_events1.sub();
let toml2 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519_5.pem" }}
[node.network.libp2p]
idle_connection_timeout = 360
listen_address = "{listen_addr2}"
node_addresses = ["{node_addra}"]
[node.network.libp2p.dht]
p2p_workflow_info_timeout = 0
receipt_quorum = 1
workflow_quorum = 1
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.pubsub]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port2}
[node.network.rpc]
port = {rpc_port2}
[node.network.webserver]
port = {ws_port2}
"#
);
let config2 = make_config!(toml2);
let homestar_proc2 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config2.filename())
.arg("--db")
.arg(&proc_info2.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let proc_guard2 = ChildGuard::new(homestar_proc2);
if wait_for_socket_connection(ws_port2, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
let mut net_events2 = subscribe_network_events(ws_port2).await;
let sub2 = net_events2.sub();
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
break;
}
} else {
panic!("Node one did not establish a connection with node two in time.")
}
}
let _ = Command::new(BIN.as_os_str())
.arg("run")
.arg("-p")
.arg(rpc_port1.to_string())
.arg("tests/fixtures/test-workflow-add-one.json")
.output();
tokio::time::sleep(Duration::from_secs(1)).await;
let expected_workflow_cid = "bafyrmidbhanzivykbzxfichwvpvywhkthd6wycmwlaha46z3lk5v3ilo5q";
let _ = Command::new(BIN.as_os_str())
.arg("run")
.arg("-p")
.arg(rpc_port2.to_string())
.arg("tests/fixtures/test-workflow-add-one.json")
.output();
let received_workflow_info_cid: Cid;
loop {
if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["received_workflow_info"].is_object() {
received_workflow_info_cid =
Cid::from_str(json["received_workflow_info"]["cid"].as_str().unwrap())
.expect("Unable to parse received workflow info CID.");
break;
}
} else {
panic!("Node two did not get workflow info in time.")
}
}
assert_eq!(
received_workflow_info_cid.to_string(),
expected_workflow_cid
);
let settings = Settings::load_from_file(PathBuf::from(config2.filename())).unwrap();
let db = Db::setup_connection_pool(
settings.node(),
Some(proc_info2.db_path.display().to_string()),
)
.expect("Failed to connect to node two database");
let stored_workflow_info =
Db::get_workflow_info(received_workflow_info_cid, &mut db.conn().unwrap());
assert!(stored_workflow_info.is_ok());
let dead_proc1 = kill_homestar(proc_guard1.take(), None);
let dead_proc2 = kill_homestar(proc_guard2.take(), None);
let stdout1 = retrieve_output(dead_proc1);
let stdout2 = retrieve_output(dead_proc2);
let providing_workflow_info_logged = check_for_line_with(
stdout1.clone(),
vec!["successfully providing", expected_workflow_cid],
);
let got_workflow_info_provider_logged = check_for_line_with(
stdout2.clone(),
vec!["got workflow info providers", ED25519MULTIHASH2],
);
let sent_workflow_info_logged = check_for_line_with(
stdout1.clone(),
vec![
"sent workflow info to peer",
ED25519MULTIHASH5,
expected_workflow_cid,
],
);
let received_workflow_info_logged = check_for_line_with(
stdout2.clone(),
vec![
"received workflow info from peer",
ED25519MULTIHASH2,
expected_workflow_cid,
],
);
assert!(providing_workflow_info_logged);
assert!(got_workflow_info_provider_logged);
assert!(sent_workflow_info_logged);
assert!(received_workflow_info_logged);
});
Ok(())
}
#[ignore]
#[test]
#[serial_test::parallel]
fn test_libp2p_dht_workflow_info_provider_recursive_integration() -> Result<()> {
let proc_info1 = ProcInfo::new().unwrap();
let proc_info2 = ProcInfo::new().unwrap();
let proc_info3 = ProcInfo::new().unwrap();
let rpc_port1 = proc_info1.rpc_port;
let rpc_port2 = proc_info2.rpc_port;
let rpc_port3 = proc_info3.rpc_port;
let metrics_port1 = proc_info1.metrics_port;
let metrics_port2 = proc_info2.metrics_port;
let metrics_port3 = proc_info3.metrics_port;
let ws_port1 = proc_info1.ws_port;
let ws_port2 = proc_info2.ws_port;
let ws_port3 = proc_info3.ws_port;
let listen_addr1 = listen_addr(proc_info1.listen_port);
let listen_addr2 = listen_addr(proc_info2.listen_port);
let listen_addr3 = listen_addr(proc_info3.listen_port);
let node_addra = multiaddr(proc_info1.listen_port, ED25519MULTIHASH);
let node_addrb = multiaddr(proc_info2.listen_port, SECP256K1MULTIHASH);
let node_addrc = multiaddr(proc_info3.listen_port, ED25519MULTIHASH2);
let toml1 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }}
[node.network.libp2p]
listen_address = "{listen_addr1}"
node_addresses = ["{node_addrb}", "{node_addrc}"]
# Force node one to request from node two
# as a provider instead of from DHT
p2p_workflow_info_timeout = 0
p2p_provider_timeout = 10000
receipt_quorum = 1
workflow_quorum = 1
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.pubsub]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port1}
[node.network.rpc]
port = {rpc_port1}
[node.network.webserver]
port = {ws_port1}
"#
);
let config1 = make_config!(toml1);
tokio_test::block_on(async move {
let homestar_proc1 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config1.filename())
.arg("--db")
.arg(&proc_info1.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let _proc_guard1 = ChildGuard::new(homestar_proc1);
if wait_for_socket_connection(ws_port1, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
let mut net_events1 = subscribe_network_events(ws_port1).await;
let sub1 = net_events1.sub();
let toml2 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }}
[node.network.libp2p]
listen_address = "{listen_addr2}"
node_addresses = ["{node_addra}"]
# Allow node two to request workflow info from DHT
p2p_workflow_info_timeout = 5000
p2p_provider_timeout = 0
receipt_quorum = 1
workflow_quorum = 1
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.pubsub]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port2}
[node.network.rpc]
port = {rpc_port2}
[node.network.webserver]
port = {ws_port2}
"#
);
let config2 = make_config!(toml2);
let homestar_proc2 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config2.filename())
.arg("--db")
.arg(&proc_info2.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let _proc_guard2 = ChildGuard::new(homestar_proc2);
if wait_for_socket_connection(ws_port2, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
let mut net_events2 = subscribe_network_events(ws_port2).await;
let sub2 = net_events2.sub();
let toml3 = format!(
r#"
[node]
[node.network.keypair_config]
existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519_2.pem" }}
[node.network.libp2p]
listen_address = "{listen_addr3}"
node_addresses = ["{node_addra}"]
# Allow node two to request workflow info from DHT
p2p_workflow_info_timeout = 0
p2p_provider_timeout = 10000
receipt_quorum = 1
workflow_quorum = 1
[node.network.libp2p.mdns]
enable = false
[node.network.libp2p.pubsub]
enable = false
[node.network.libp2p.rendezvous]
enable_client = false
[node.network.metrics]
port = {metrics_port3}
[node.network.rpc]
port = {rpc_port3}
[node.network.webserver]
port = {ws_port3}
"#
);
let config3 = make_config!(toml3);
let homestar_proc3 = Command::new(BIN.as_os_str())
.env("RUST_BACKTRACE", "0")
.env(
"RUST_LOG",
"homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug",
)
.arg("start")
.arg("-c")
.arg(config3.filename())
.arg("--db")
.arg(&proc_info3.db_path)
.stdout(Stdio::piped())
.spawn()
.unwrap();
let _guard3 = ChildGuard::new(homestar_proc3);
if wait_for_socket_connection(ws_port3, 1000).is_err() {
panic!("Homestar server/runtime failed to start in time");
}
let mut net_events3 = subscribe_network_events(ws_port3).await;
let sub3 = net_events3.sub();
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
assert_eq!(
json["connection_established"]["peer_id"],
SECP256K1MULTIHASH.to_string()
);
break;
}
} else {
panic!("Node one did not establish a connection with node two in time.")
}
}
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
if json["connection_established"].is_object() {
assert_eq!(
json["connection_established"]["peerId"],
ED25519MULTIHASH2.to_string()
);
break;
}
} else {
panic!("Node one did not establish a connection with node three in time.")
}
}
let _ = Command::new(BIN.as_os_str())
.arg("run")
.arg("-p")
.arg(rpc_port1.to_string())
.arg("tests/fixtures/test-workflow-add-one.json")
.output();
loop {
if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
println!("node1: {json}");
if json["put_workflow_info_dht"].is_object() {
assert_eq!(
json["put_workflow_info_dht"]["cid"].as_str().unwrap(),
"bafyrmihctgawsskx54qyt3clcaq2quc42pqxzhr73o6qjlc3rc4mhznotq"
);
break;
}
} else {
panic!("Node one did not put workflow info in time.")
}
}
let _ = Command::new(BIN.as_os_str())
.arg("run")
.arg("-p")
.arg(rpc_port2.to_string())
.arg("tests/fixtures/test-workflow-add-one.json")
.output();
loop {
if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
println!("node2: {json}");
if json["got_workflow_info_dht"].is_object() {
assert_eq!(
json["got_workflow_info_dht"]["cid"].as_str().unwrap(),
"bafyrmihctgawsskx54qyt3clcaq2quc42pqxzhr73o6qjlc3rc4mhznotq"
);
break;
}
} else {
panic!("Node two did not get workflow info in time.")
}
}
let db = db::Db::setup_connection_pool(
&Settings::load().unwrap().node(),
Some(proc_info1.db_path.display().to_string()),
)
.unwrap();
diesel::delete(schema::workflows_receipts::table)
.execute(&mut db.conn().unwrap())
.unwrap();
diesel::delete(schema::workflows::table)
.execute(&mut db.conn().unwrap())
.unwrap();
let _ = Command::new(BIN.as_os_str())
.arg("run")
.arg("-p")
.arg(rpc_port3.to_string())
.arg("tests/fixtures/test-workflow-add-one.json")
.output();
loop {
if let Ok(msg) = sub3.next().with_timeout(Duration::from_secs(30)).await {
let json: serde_json::Value =
serde_json::from_slice(&msg.unwrap().unwrap()).unwrap();
println!("node3: {json}");
if json["type"].as_str().unwrap() == "network:receivedWorkflowInfo" {
assert_eq!(
json["data"]["provider"],
"16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc"
);
assert_eq!(
json["data"]["cid"].as_str().unwrap(),
"bafyrmihctgawsskx54qyt3clcaq2quc42pqxzhr73o6qjlc3rc4mhznotq"
);
break;
}
} else {
panic!("Node three did not receive workflow info in time.")
}
}
});
Ok(())
}