use ::fixt::prelude::*;
use anyhow::Result;
use futures::future;
use hdk::prelude::RemoteSignal;
use holochain::sweettest::SweetAgents;
use holochain::sweettest::SweetConductorBatch;
use holochain::sweettest::SweetDnaFile;
use holochain::{
conductor::{
api::{AdminRequest, AdminResponse},
error::ConductorError,
Conductor,
},
fixt::*,
};
use holochain_types::{
prelude::*,
test_utils::{fake_agent_pubkey_1, fake_dna_zomes, write_fake_dna_file},
};
use holochain_wasm_test_utils::TestWasm;
use holochain_websocket::*;
use matches::assert_matches;
use observability;
use std::sync::Arc;
use std::time::Duration;
use tempdir::TempDir;
use tokio_stream::StreamExt;
use tracing::*;
use url2::prelude::*;
use test_utils::*;
pub mod test_utils;
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "slow_tests")]
async fn call_admin() {
observability::test_run().ok();
let port = 9909;
let tmp_dir = TempDir::new("conductor_cfg").unwrap();
let path = tmp_dir.path().to_path_buf();
let environment_path = path.clone();
let config = create_config(port, environment_path);
let config_path = write_config(path, &config);
let uuid = uuid::Uuid::new_v4();
let dna = fake_dna_zomes(
&uuid.to_string(),
vec![(TestWasm::Foo.into(), TestWasm::Foo.into())],
);
let _holochain = start_holochain(config_path.clone()).await;
let (mut client, _) = websocket_client_by_port(port).await.unwrap();
let original_dna_hash = dna.dna_hash().clone();
let properties = holochain_types::properties::YamlProperties::new(
serde_yaml::from_str(
r#"
test: "example"
how_many: 42
"#,
)
.unwrap(),
);
let (fake_dna_path, _tmpdir) = write_fake_dna_file(dna.clone()).await.unwrap();
let orig_dna_hash = dna.dna_hash().clone();
register_and_install_dna(
&mut client,
orig_dna_hash,
fake_agent_pubkey_1(),
fake_dna_path,
Some(properties.clone()),
"nick".into(),
6000,
)
.await;
let request = AdminRequest::ListDnas;
let response = client.request(request);
let response = check_timeout(response, 6000).await;
let tmp_wasm = dna.code().values().cloned().collect::<Vec<_>>();
let mut tmp_dna = dna.dna_def().clone();
tmp_dna.properties = properties.try_into().unwrap();
let dna = holochain_types::dna::DnaFile::new(tmp_dna, tmp_wasm)
.await
.unwrap();
assert_ne!(&original_dna_hash, dna.dna_hash());
let expects = vec![dna.dna_hash().clone()];
assert_matches!(response, AdminResponse::DnasListed(a) if a == expects);
}
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "slow_tests")]
async fn call_zome() {
observability::test_run().ok();
let admin_port = 9910;
let app_port = 9913;
let tmp_dir = TempDir::new("conductor_cfg_2").unwrap();
let path = tmp_dir.path().to_path_buf();
let environment_path = path.clone();
let config = create_config(admin_port, environment_path);
let config_path = write_config(path, &config);
let holochain = start_holochain(config_path.clone()).await;
let (mut client, _) = websocket_client_by_port(admin_port).await.unwrap();
let (_, receiver2) = websocket_client_by_port(admin_port).await.unwrap();
let uuid = uuid::Uuid::new_v4();
let dna = fake_dna_zomes(
&uuid.to_string(),
vec![(TestWasm::Foo.into(), TestWasm::Foo.into())],
);
let original_dna_hash = dna.dna_hash().clone();
let (fake_dna_path, _tmpdir) = write_fake_dna_file(dna.clone()).await.unwrap();
let _dna_hash = register_and_install_dna(
&mut client,
original_dna_hash.clone(),
fake_agent_pubkey_1(),
fake_dna_path,
None,
"".into(),
6000,
)
.await;
let request = AdminRequest::ListDnas;
let response = client.request(request);
let response = check_timeout(response, 3000).await;
let expects = vec![original_dna_hash.clone()];
assert_matches!(response, AdminResponse::DnasListed(a) if a == expects);
let request = AdminRequest::EnableApp {
installed_app_id: "test".to_string(),
};
let response = client.request(request);
let response = check_timeout(response, 3000).await;
assert_matches!(response, AdminResponse::AppEnabled { .. });
let app_port_rcvd = attach_app_interface(&mut client, Some(app_port)).await;
assert_eq!(app_port, app_port_rcvd);
tracing::info!("Calling zome");
call_foo_fn(app_port, original_dna_hash.clone()).await;
assert!(Box::pin(receiver2.timeout(Duration::from_millis(500)))
.next()
.await
.unwrap()
.is_err());
std::mem::drop(holochain);
std::mem::drop(client);
tracing::info!("Restarting conductor");
let _holochain = start_holochain(config_path).await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
tracing::info!("Calling zome again");
call_foo_fn(app_port, original_dna_hash).await;
}
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "slow_tests")]
async fn remote_signals() -> anyhow::Result<()> {
observability::test_run().ok();
const NUM_CONDUCTORS: usize = 2;
let mut conductors = SweetConductorBatch::from_standard_config(NUM_CONDUCTORS).await;
let all_agents: Vec<HoloHash<hash_type::Agent>> =
future::join_all(conductors.iter().map(|c| SweetAgents::one(c.keystore()))).await;
let dna_file = SweetDnaFile::unique_from_test_wasms(vec![TestWasm::EmitSignal])
.await
.unwrap()
.0;
let apps = conductors
.setup_app_for_zipped_agents("app", &all_agents, &[dna_file])
.await
.unwrap();
conductors.exchange_peer_info().await;
let cells = apps.cells_flattened();
let mut rxs = Vec::new();
for h in conductors.iter().map(|c| c) {
rxs.push(h.signal_broadcaster().await.subscribe_separately())
}
let rxs = rxs.into_iter().flatten().collect::<Vec<_>>();
let signal = fixt!(ExternIo);
let _: () = conductors[0]
.call(
&cells[0].zome(TestWasm::EmitSignal),
"signal_others",
RemoteSignal {
signal: signal.clone(),
agents: all_agents,
},
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
let signal = AppSignal::new(signal);
for mut rx in rxs {
let r = rx.try_recv();
assert_matches!(r, Ok(Signal::App(_, a)) if a == signal);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "slow_tests")]
async fn emit_signals() {
observability::test_run().ok();
let admin_port = 9911;
let tmp_dir = TempDir::new("conductor_cfg_emit_signals").unwrap();
let path = tmp_dir.path().to_path_buf();
let environment_path = path.clone();
let config = create_config(admin_port, environment_path);
let config_path = write_config(path, &config);
let _holochain = start_holochain(config_path.clone()).await;
let (mut admin_tx, _) = websocket_client_by_port(admin_port).await.unwrap();
let uuid = uuid::Uuid::new_v4();
let dna = fake_dna_zomes(
&uuid.to_string(),
vec![(TestWasm::EmitSignal.into(), TestWasm::EmitSignal.into())],
);
let orig_dna_hash = dna.dna_hash().clone();
let (fake_dna_path, _tmpdir) = write_fake_dna_file(dna).await.unwrap();
let agent_key = fake_agent_pubkey_1();
let dna_hash = register_and_install_dna(
&mut admin_tx,
orig_dna_hash,
fake_agent_pubkey_1(),
fake_dna_path,
None,
"".into(),
6000,
)
.await;
let cell_id = CellId::new(dna_hash.clone(), agent_key.clone());
let request = AdminRequest::EnableApp {
installed_app_id: "test".to_string(),
};
let response = admin_tx.request(request);
let response = check_timeout(response, 3000).await;
assert_matches!(response, AdminResponse::AppEnabled { .. });
let app_port = attach_app_interface(&mut admin_tx, None).await;
let (mut app_tx_1, app_rx_1) = websocket_client_by_port(app_port).await.unwrap();
let (_, app_rx_2) = websocket_client_by_port(app_port).await.unwrap();
call_zome_fn(
&mut app_tx_1,
cell_id.clone(),
TestWasm::EmitSignal,
"emit".into(),
(),
)
.await;
let (sig1, msg1) = Box::pin(app_rx_1.timeout(Duration::from_secs(1)))
.next()
.await
.unwrap()
.unwrap();
assert!(!msg1.is_request());
let (sig2, msg2) = Box::pin(app_rx_2.timeout(Duration::from_secs(1)))
.next()
.await
.unwrap()
.unwrap();
assert!(!msg2.is_request());
assert_eq!(
Signal::App(cell_id, AppSignal::new(ExternIO::encode(()).unwrap())),
Signal::try_from(sig1.clone()).unwrap(),
);
assert_eq!(sig1, sig2);
}
#[tokio::test(flavor = "multi_thread")]
async fn conductor_admin_interface_runs_from_config() -> Result<()> {
observability::test_run().ok();
let tmp_dir = TempDir::new("conductor_cfg").unwrap();
let environment_path = tmp_dir.path().to_path_buf();
let config = create_config(0, environment_path);
let conductor_handle = Conductor::builder().config(config).build().await?;
let (mut client, _) = websocket_client(&conductor_handle).await?;
let dna = fake_dna_zomes(
"".into(),
vec![(TestWasm::Foo.into(), TestWasm::Foo.into())],
);
let (fake_dna_path, _tmpdir) = write_fake_dna_file(dna).await.unwrap();
let register_payload = RegisterDnaPayload {
uid: None,
properties: None,
source: DnaSource::Path(fake_dna_path),
};
let request = AdminRequest::RegisterDna(Box::new(register_payload));
let response = client.request(request).await.unwrap();
assert_matches!(response, AdminResponse::DnaRegistered(_));
conductor_handle.shutdown();
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn conductor_admin_interface_ends_with_shutdown() -> Result<()> {
if let Err(e) = conductor_admin_interface_ends_with_shutdown_inner().await {
panic!("{:#?}", e);
}
Ok(())
}
async fn conductor_admin_interface_ends_with_shutdown_inner() -> Result<()> {
observability::test_run().ok();
info!("creating config");
let tmp_dir = TempDir::new("conductor_cfg").unwrap();
let environment_path = tmp_dir.path().to_path_buf();
let config = create_config(0, environment_path);
let conductor_handle = Conductor::builder().config(config).build().await?;
let port = admin_port(&conductor_handle).await;
info!("building conductor");
let (mut client, mut rx): (WebsocketSender, WebsocketReceiver) = holochain_websocket::connect(
url2!("ws://127.0.0.1:{}", port),
Arc::new(WebsocketConfig {
default_request_timeout_s: 1,
..Default::default()
}),
)
.await?;
info!("client connect");
conductor_handle.shutdown();
info!("shutdown");
assert_matches!(
conductor_handle.check_running(),
Err(ConductorError::ShuttingDown)
);
assert!(rx.next().await.is_none());
info!("About to make failing request");
let dna = fake_dna_zomes(
"".into(),
vec![(TestWasm::Foo.into(), TestWasm::Foo.into())],
);
let (fake_dna_path, _tmpdir) = write_fake_dna_file(dna).await.unwrap();
let register_payload = RegisterDnaPayload {
uid: None,
properties: None,
source: DnaSource::Path(fake_dna_path),
};
let request = AdminRequest::RegisterDna(Box::new(register_payload));
let response: Result<Result<AdminResponse, _>, tokio::time::error::Elapsed> =
tokio::time::timeout(Duration::from_secs(1), client.request(request)).await;
assert_matches!(response, Ok(Err(WebsocketError::Shutdown)));
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn too_many_open() {
observability::test_run().ok();
info!("creating config");
let tmp_dir = TempDir::new("conductor_cfg").unwrap();
let environment_path = tmp_dir.path().to_path_buf();
let config = create_config(0, environment_path);
let conductor_handle = Conductor::builder().config(config).build().await.unwrap();
let port = admin_port(&conductor_handle).await;
info!("building conductor");
for _i in 0..1000 {
holochain_websocket::connect(
url2!("ws://127.0.0.1:{}", port),
Arc::new(WebsocketConfig {
default_request_timeout_s: 1,
..Default::default()
}),
)
.await
.unwrap();
}
conductor_handle.shutdown();
}
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "slow_tests")]
async fn concurrent_install_dna() {
use futures::StreamExt;
static NUM_DNA: u8 = 50;
static NUM_CONCURRENT_INSTALLS: u8 = 10;
static REQ_TIMEOUT_MS: u64 = 15000;
observability::test_run().ok();
let admin_port = 9912;
let tmp_dir = TempDir::new("conductor_cfg_concurrent_install_dna").unwrap();
let path = tmp_dir.path().to_path_buf();
let environment_path = path.clone();
let config = create_config(admin_port, environment_path);
let config_path = write_config(path, &config);
let _holochain = start_holochain(config_path.clone()).await;
let (client, _) = websocket_client_by_port(admin_port).await.unwrap();
let before = std::time::Instant::now();
let install_tasks_stream = futures::stream::iter((0..NUM_DNA).into_iter().map(|i| {
let zomes = vec![(TestWasm::Foo.into(), TestWasm::Foo.into())];
let mut client = client.clone();
tokio::spawn(async move {
let nick = format!("fake_dna_{}", i);
let dna = fake_dna_zomes_named(&uuid::Uuid::new_v4().to_string(), &nick, zomes.clone());
let original_dna_hash = dna.dna_hash().clone();
let (fake_dna_path, _tmpdir) = write_fake_dna_file(dna.clone()).await.unwrap();
let agent_key = generate_agent_pubkey(&mut client, REQ_TIMEOUT_MS).await;
println!("[{}] Agent pub key generated", i);
let dna_hash = register_and_install_dna_named(
&mut client,
original_dna_hash.clone(),
agent_key,
fake_dna_path.clone(),
None,
nick.clone(),
nick.clone(),
REQ_TIMEOUT_MS,
)
.await;
println!(
"[{}] installed dna with hash {} and name {}",
i, dna_hash, nick
);
})
}))
.buffer_unordered(NUM_CONCURRENT_INSTALLS.into());
let install_tasks = futures::StreamExt::collect::<Vec<_>>(install_tasks_stream);
for r in install_tasks.await {
r.unwrap();
}
println!(
"installed {} dna in {:?}",
NUM_CONCURRENT_INSTALLS,
before.elapsed()
);
}