use crate::conductor::api::RealAppInterfaceApi;
use crate::conductor::conductor::CellStatus;
use crate::conductor::config::AdminInterfaceConfig;
use crate::conductor::config::ConductorConfig;
use crate::conductor::config::InterfaceDriver;
use crate::conductor::p2p_agent_store;
use crate::conductor::ConductorBuilder;
use crate::conductor::ConductorHandle;
use crate::core::queue_consumer::TriggerSender;
use crate::core::ribosome::ZomeCallInvocation;
use ::fixt::prelude::*;
use hdk::prelude::ZomeName;
use holo_hash::fixt::*;
use holo_hash::*;
use holochain_conductor_api::IntegrationStateDump;
use holochain_conductor_api::IntegrationStateDumps;
use holochain_conductor_api::ZomeCall;
use holochain_keystore::MetaLairClient;
use holochain_p2p::actor::HolochainP2pRefToDna;
use holochain_p2p::dht::prelude::Topology;
use holochain_p2p::dht::ArqStrat;
use holochain_p2p::dht::PeerViewQ;
use holochain_p2p::event::HolochainP2pEvent;
use holochain_p2p::spawn_holochain_p2p;
use holochain_p2p::HolochainP2pDna;
use holochain_p2p::HolochainP2pRef;
use holochain_p2p::HolochainP2pSender;
use holochain_serialized_bytes::SerializedBytesError;
use holochain_sqlite::prelude::DatabaseResult;
use holochain_state::nonce::fresh_nonce;
use holochain_state::prelude::from_blob;
use holochain_state::prelude::test_db_dir;
use holochain_state::prelude::SourceChainResult;
use holochain_state::prelude::StateQueryResult;
use holochain_state::source_chain;
use holochain_types::db_cache::DhtDbQueryCache;
use holochain_types::prelude::*;
use holochain_wasm_test_utils::TestWasm;
use kitsune_p2p::KitsuneP2pConfig;
use kitsune_p2p_types::ok_fut;
use rusqlite::named_params;
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tokio::sync::mpsc;
pub use itertools;
pub mod conductor_setup;
pub mod consistency;
pub mod host_fn_caller;
pub mod inline_zomes;
pub mod network_simulation;
mod wait_for;
pub use wait_for::*;
mod generate_records;
pub use generate_records::*;
pub use crate::sweettest::sweet_consistency::*;
use self::consistency::request_published_ops;
#[macro_export]
macro_rules! here {
($test: expr) => {
concat!($test, " !!!_LOOK HERE:---> ", file!(), ":", line!())
};
}
#[macro_export]
macro_rules! meta_mock {
() => {{
holochain_state::metadata::MockMetadataBuf::new()
}};
($fun:ident) => {{
let d: Vec<holochain_types::metadata::TimedActionHash> = Vec::new();
meta_mock!($fun, d)
}};
($fun:ident, $data:expr) => {{
let mut metadata = holochain_state::metadata::MockMetadataBuf::new();
metadata.$fun().returning({
move |_| {
Ok(Box::new(fallible_iterator::convert(
$data
.clone()
.into_iter()
.map(holochain_types::metadata::TimedActionHash::from)
.map(Ok),
)))
}
});
metadata
}};
($fun:ident, $data:expr, $match_fn:expr) => {{
let mut metadata = holochain_state::metadata::MockMetadataBuf::new();
metadata.$fun().returning({
move |a| {
if $match_fn(a) {
Ok(Box::new(fallible_iterator::convert(
$data
.clone()
.into_iter()
.map(holochain_types::metadata::TimedActionHash::from)
.map(Ok),
)))
} else {
let mut data = $data.clone();
data.clear();
Ok(Box::new(fallible_iterator::convert(
data.into_iter()
.map(holochain_types::metadata::TimedActionHash::from)
.map(Ok),
)))
}
}
});
metadata
}};
}
pub struct TestNetwork {
network: Option<HolochainP2pRef>,
respond_task: Option<tokio::task::JoinHandle<()>>,
dna_network: HolochainP2pDna,
#[allow(clippy::type_complexity)]
pub check_op_data_calls: Arc<
std::sync::Mutex<
Vec<(
kitsune_p2p_types::KSpace,
Vec<kitsune_p2p_types::KOpHash>,
Option<kitsune_p2p::dependencies::kitsune_p2p_fetch::FetchContext>,
)>,
>,
>,
}
impl TestNetwork {
#[allow(clippy::type_complexity)]
fn new(
network: HolochainP2pRef,
respond_task: tokio::task::JoinHandle<()>,
dna_network: HolochainP2pDna,
check_op_data_calls: Arc<
std::sync::Mutex<
Vec<(
kitsune_p2p_types::KSpace,
Vec<kitsune_p2p_types::KOpHash>,
Option<kitsune_p2p::dependencies::kitsune_p2p_fetch::FetchContext>,
)>,
>,
>,
) -> Self {
Self {
network: Some(network),
respond_task: Some(respond_task),
dna_network,
check_op_data_calls,
}
}
pub fn network(&self) -> HolochainP2pRef {
self.network
.as_ref()
.expect("Tried to use network while it was shutting down")
.clone()
}
pub fn dna_network(&self) -> HolochainP2pDna {
self.dna_network.clone()
}
}
impl Drop for TestNetwork {
fn drop(&mut self) {
use ghost_actor::GhostControlSender;
let network = self.network.take().unwrap();
let respond_task = self.respond_task.take().unwrap();
tokio::task::spawn(async move {
network.ghost_actor_shutdown_immediate().await.ok();
respond_task.await.ok();
});
}
}
pub async fn test_network(
dna_hash: Option<DnaHash>,
agent_key: Option<AgentPubKey>,
) -> TestNetwork {
test_network_inner::<fn(&HolochainP2pEvent) -> bool>(dna_hash, agent_key, None).await
}
pub async fn test_network_with_events<F>(
dna_hash: Option<DnaHash>,
agent_key: Option<AgentPubKey>,
filter: F,
evt_send: mpsc::Sender<HolochainP2pEvent>,
) -> TestNetwork
where
F: Fn(&HolochainP2pEvent) -> bool + Send + 'static,
{
test_network_inner(dna_hash, agent_key, Some((filter, evt_send))).await
}
async fn test_network_inner<F>(
dna_hash: Option<DnaHash>,
agent_key: Option<AgentPubKey>,
mut events: Option<(F, mpsc::Sender<HolochainP2pEvent>)>,
) -> TestNetwork
where
F: Fn(&HolochainP2pEvent) -> bool + Send + 'static,
{
let mut config = holochain_p2p::kitsune_p2p::KitsuneP2pConfig::default();
let mut tuning =
kitsune_p2p_types::config::tuning_params_struct::KitsuneP2pTuningParams::default();
tuning.tx2_implicit_timeout_ms = 500;
let tuning = std::sync::Arc::new(tuning);
let cutoff = tuning.danger_gossip_recent_threshold();
config.tuning_params = tuning;
let check_op_data_calls = Arc::new(std::sync::Mutex::new(Vec::new()));
let test_host = {
let check_op_data_calls = check_op_data_calls.clone();
kitsune_p2p::HostStub::with_check_op_data(Box::new(move |space, list, ctx| {
let out = list.iter().map(|_| false).collect();
check_op_data_calls.lock().unwrap().push((space, list, ctx));
futures::FutureExt::boxed(async move { Ok(out) }).into()
}))
};
let (network, mut recv) = spawn_holochain_p2p(
config,
holochain_p2p::kitsune_p2p::dependencies::kitsune_p2p_types::tls::TlsConfig::new_ephemeral(
)
.await
.unwrap(),
test_host,
)
.await
.unwrap();
let respond_task = tokio::task::spawn(async move {
use tokio_stream::StreamExt;
while let Some(evt) = recv.next().await {
if let Some((filter, tx)) = &mut events {
if filter(&evt) {
tx.send(evt).await.unwrap();
continue;
}
}
use holochain_p2p::event::HolochainP2pEvent::*;
match evt {
SignNetworkData { respond, .. } => {
respond.r(ok_fut(Ok([0; 64].into())));
}
PutAgentInfoSigned { respond, .. } => {
respond.r(ok_fut(Ok(())));
}
QueryAgentInfoSigned { respond, .. } => {
respond.r(ok_fut(Ok(vec![])));
}
QueryAgentInfoSignedNearBasis { respond, .. } => {
respond.r(ok_fut(Ok(vec![])));
}
QueryGossipAgents { respond, .. } => {
respond.r(ok_fut(Ok(vec![])));
}
QueryPeerDensity { respond, .. } => {
respond.r(ok_fut(Ok(PeerViewQ::new(
Topology::standard_epoch(cutoff),
ArqStrat::default(),
vec![],
)
.into())));
}
oth => tracing::warn!(?oth, "UnhandledEvent"),
}
}
});
let dna = dna_hash.unwrap_or_else(|| fixt!(DnaHash));
let mut key_fixt = AgentPubKeyFixturator::new(Predictable);
let agent_key = agent_key.unwrap_or_else(|| key_fixt.next().unwrap());
let dna_network = network.to_dna(dna.clone(), None);
network
.join(dna.clone(), agent_key, None, None)
.await
.unwrap();
TestNetwork::new(network, respond_task, dna_network, check_op_data_calls)
}
pub async fn install_app(
name: &str,
cell_data: Vec<(InstalledCell, Option<MembraneProof>)>,
dnas: Vec<DnaFile>,
conductor_handle: ConductorHandle,
) {
for dna in dnas {
conductor_handle.register_dna(dna).await.unwrap();
}
conductor_handle
.clone()
.install_app_legacy(name.to_string(), cell_data)
.await
.unwrap();
conductor_handle
.clone()
.enable_app(name.to_string())
.await
.unwrap();
let errors = conductor_handle
.reconcile_cell_status_with_app_status()
.await
.unwrap();
assert!(errors.is_empty(), "{:?}", errors);
}
pub type InstalledCellsWithProofs = Vec<(InstalledCell, Option<MembraneProof>)>;
pub async fn setup_app_in_new_conductor(
installed_app_id: InstalledAppId,
dnas: Vec<DnaFile>,
cell_data: Vec<(InstalledCell, Option<MembraneProof>)>,
) -> (Arc<TempDir>, RealAppInterfaceApi, ConductorHandle) {
let db_dir = test_db_dir();
let conductor_handle = ConductorBuilder::new()
.test(db_dir.path(), &[])
.await
.unwrap();
install_app_in_conductor(conductor_handle.clone(), installed_app_id, dnas, cell_data).await;
let handle = conductor_handle.clone();
(
Arc::new(db_dir),
RealAppInterfaceApi::new(conductor_handle),
handle,
)
}
pub async fn install_app_in_conductor(
conductor_handle: ConductorHandle,
installed_app_id: InstalledAppId,
dnas: Vec<DnaFile>,
cell_data: Vec<(InstalledCell, Option<MembraneProof>)>,
) {
for dna in dnas {
conductor_handle.register_dna(dna).await.unwrap();
}
conductor_handle
.clone()
.install_app_legacy(installed_app_id.clone(), cell_data)
.await
.unwrap();
conductor_handle
.clone()
.enable_app(installed_app_id)
.await
.unwrap();
let errors = conductor_handle
.clone()
.reconcile_cell_status_with_app_status()
.await
.unwrap();
assert!(errors.is_empty());
}
pub async fn setup_app_with_names(
apps_data: Vec<(&str, InstalledCellsWithProofs)>,
dnas: Vec<DnaFile>,
) -> (TempDir, RealAppInterfaceApi, ConductorHandle) {
let dir = test_db_dir();
let (iface, handle) = setup_app_inner(dir.path(), apps_data, dnas, None).await;
(dir, iface, handle)
}
pub async fn setup_app_with_network(
apps_data: Vec<(&str, InstalledCellsWithProofs)>,
dnas: Vec<DnaFile>,
network: KitsuneP2pConfig,
) -> (TempDir, RealAppInterfaceApi, ConductorHandle) {
let dir = test_db_dir();
let (iface, handle) = setup_app_inner(dir.path(), apps_data, dnas, Some(network)).await;
(dir, iface, handle)
}
pub async fn setup_app_inner(
db_dir: &Path,
apps_data: Vec<(&str, InstalledCellsWithProofs)>,
dnas: Vec<DnaFile>,
network: Option<KitsuneP2pConfig>,
) -> (RealAppInterfaceApi, ConductorHandle) {
let conductor_handle = ConductorBuilder::new()
.config(ConductorConfig {
admin_interfaces: Some(vec![AdminInterfaceConfig {
driver: InterfaceDriver::Websocket { port: 0 },
}]),
network,
..Default::default()
})
.test(db_dir, &[])
.await
.unwrap();
for (app_name, cell_data) in apps_data {
install_app(app_name, cell_data, dnas.clone(), conductor_handle.clone()).await;
}
let handle = conductor_handle.clone();
(RealAppInterfaceApi::new(conductor_handle), handle)
}
pub fn warm_wasm_tests() {
if let Some(_path) = std::env::var_os("HC_WASM_CACHE_PATH") {
let wasms: Vec<_> = TestWasm::iter().collect();
crate::fixt::RealRibosomeFixturator::new(crate::fixt::curve::Zomes(wasms))
.next()
.unwrap();
}
}
pub async fn consistency_dbs<AuthorDb, DhtDb>(
all_cell_dbs: &[(&AgentPubKey, &AuthorDb, Option<&DhtDb>)],
num_attempts: usize,
delay: Duration,
) where
AuthorDb: ReadAccess<DbKindAuthored>,
DhtDb: ReadAccess<DbKindDht>,
{
let mut published = HashSet::new();
for (author, db, _) in all_cell_dbs.iter() {
published.extend(
request_published_ops(*db, Some((*author).to_owned()))
.await
.unwrap()
.into_iter()
.map(|(_, _, op)| op),
);
}
let published = published.into_iter().collect::<Vec<_>>();
for &db in all_cell_dbs.iter().flat_map(|(_, _, d)| d) {
wait_for_integration_diff(db, &published, num_attempts, delay).await
}
}
#[tracing::instrument(skip(db, published))]
async fn wait_for_integration_diff<Db: ReadAccess<DbKindDht>>(
db: &Db,
published: &[DhtOp],
num_attempts: usize,
delay: Duration,
) {
fn display_op(op: &DhtOp) -> String {
format!(
"{} {:>3} {} ({})",
op.action().author(),
op.action().action_seq(),
op.get_type(),
op.action().action_type(),
)
}
let header = format!("{:54} {:>3} {}", "author", "seq", "op_type (action_type)",);
let num_published = published.len();
let mut num_integrated = 0;
for i in 0..num_attempts {
num_integrated = get_integrated_count(db).await;
if num_integrated >= num_published {
if num_integrated > num_published {
tracing::warn!("num integrated ops ({}) > num published ops ({}), meaning you may not be accounting for all nodes in this test.
Consistency may not be complete.", num_integrated, num_published)
}
return;
} else {
let total_time_waited = delay * i as u32;
tracing::debug!(?num_integrated, ?total_time_waited, counts = ?query_integration(db).await);
}
tokio::time::sleep(delay).await;
}
let mut published: Vec<_> = published.iter().map(display_op).collect();
let mut integrated: Vec<_> = get_integrated_ops(db)
.await
.iter()
.map(display_op)
.collect();
published.sort();
integrated.sort();
let unintegrated = diff::slice(&published, &integrated)
.into_iter()
.filter_map(|d| match d {
diff::Result::Left(l) => Some(l),
_ => None,
})
.cloned()
.collect::<Vec<_>>();
assert!(
!unintegrated.is_empty(),
"consistency should only fail if items were published but not integrated"
);
let timeout = delay * num_attempts as u32;
panic!(
"Consistency not achieved after {:?}ms. Expected {} ops, but only {} integrated. Unintegrated ops:\n\n{}\n{}\n",
timeout.as_millis(),
num_published,
num_integrated,
header,
unintegrated.join("\n"),
);
}
#[tracing::instrument(skip(db))]
pub async fn wait_for_integration<Db: ReadAccess<DbKindDht>>(
db: &Db,
num_published: usize,
num_attempts: usize,
delay: Duration,
) {
for i in 0..num_attempts {
let num_integrated = get_integrated_count(db).await;
if num_integrated >= num_published {
if num_integrated > num_published {
tracing::warn!("num integrated ops > num published ops, meaning you may not be accounting for all nodes in this test.
Consistency may not be complete.")
}
return;
} else {
let total_time_waited = delay * i as u32;
tracing::debug!(?num_integrated, ?total_time_waited, counts = ?query_integration(db).await);
}
tokio::time::sleep(delay).await;
}
panic!("Consistency not achieved after {} attempts", num_attempts);
}
#[tracing::instrument(skip(envs))]
pub async fn show_authored<Db: ReadAccess<DbKindAuthored>>(envs: &[&Db]) {
for (i, &db) in envs.iter().enumerate() {
db.read_async(move |txn| -> DatabaseResult<()> {
txn.prepare("SELECT DISTINCT Action.seq, Action.type, Action.entry_hash FROM Action JOIN DhtOp ON Action.hash = DhtOp.hash")
.unwrap()
.query_map([], |row| {
let action_type: String = row.get("type")?;
let seq: u32 = row.get("seq")?;
let entry: Option<EntryHash> = row.get("entry_hash")?;
Ok((action_type, seq, entry))
})
.unwrap()
.for_each(|r|{
let (action_type, seq, entry) = r.unwrap();
tracing::debug!(chain = %i, %seq, ?action_type, ?entry);
});
Ok(())
}).await.unwrap();
}
}
pub async fn get_integration_dumps<Db: ReadAccess<DbKindDht>>(
dbs: &[&Db],
) -> IntegrationStateDumps {
let mut output = Vec::new();
for db in dbs {
let db = *db;
output.push(query_integration(db).await);
}
IntegrationStateDumps(output)
}
pub async fn query_integration<Db: ReadAccess<DbKindDht>>(db: &Db) -> IntegrationStateDump {
crate::conductor::integration_dump(&db.clone().into())
.await
.unwrap()
}
async fn get_integrated_count<Db: ReadAccess<DbKindDht>>(db: &Db) -> usize {
db.read_async(move |txn| -> DatabaseResult<usize> {
Ok(txn.query_row(
"SELECT COUNT(hash) FROM DhtOp WHERE DhtOp.when_integrated IS NOT NULL",
[],
|row| row.get(0),
)?)
})
.await
.unwrap()
}
pub async fn get_integrated_ops<Db: ReadAccess<DbKindDht>>(db: &Db) -> Vec<DhtOp> {
db.read_async(move |txn| -> StateQueryResult<Vec<DhtOp>> {
txn.prepare(
"
SELECT
DhtOp.type, Action.blob as action_blob, Entry.blob as entry_blob
FROM DhtOp
JOIN
Action ON DhtOp.action_hash = Action.hash
LEFT JOIN
Entry ON Action.entry_hash = Entry.hash
WHERE
DhtOp.when_integrated IS NOT NULL
ORDER BY DhtOp.rowid ASC
",
)
.unwrap()
.query_and_then(named_params! {}, |row| {
let op_type: DhtOpType = row.get("type")?;
let action: SignedAction = from_blob(row.get("action_blob")?)?;
let entry: Option<Vec<u8>> = row.get("entry_blob")?;
let entry: Option<Entry> = match entry {
Some(entry) => Some(from_blob::<Entry>(entry)?),
None => None,
};
Ok(DhtOp::from_type(op_type, action, entry)?)
})
.unwrap()
.collect::<StateQueryResult<_>>()
})
.await
.unwrap()
}
pub async fn display_agent_infos(conductor: &ConductorHandle) {
for cell_id in conductor.running_cell_ids(Some(CellStatus::Joined)) {
let space = cell_id.dna_hash();
let db = conductor.get_p2p_db(space);
let info = p2p_agent_store::dump_state(db.into(), Some(cell_id))
.await
.unwrap();
tracing::debug!(%info);
}
}
pub async fn new_zome_call<P, Z: Into<ZomeName>>(
keystore: &MetaLairClient,
cell_id: &CellId,
func: &str,
payload: P,
zome: Z,
) -> Result<ZomeCall, SerializedBytesError>
where
P: serde::Serialize + std::fmt::Debug,
{
let zome_call_unsigned = new_zome_call_unsigned(cell_id, func, payload, zome)?;
Ok(
ZomeCall::try_from_unsigned_zome_call(keystore, zome_call_unsigned)
.await
.unwrap(),
)
}
pub fn new_zome_call_unsigned<P, Z: Into<ZomeName>>(
cell_id: &CellId,
func: &str,
payload: P,
zome: Z,
) -> Result<ZomeCallUnsigned, SerializedBytesError>
where
P: serde::Serialize + std::fmt::Debug,
{
let (nonce, expires_at) = fresh_nonce(Timestamp::now()).unwrap();
Ok(ZomeCallUnsigned {
cell_id: cell_id.clone(),
zome_name: zome.into(),
cap_secret: Some(CapSecretFixturator::new(Unpredictable).next().unwrap()),
fn_name: func.into(),
payload: ExternIO::encode(payload)?,
provenance: cell_id.agent_pubkey().clone(),
nonce,
expires_at,
})
}
pub async fn new_invocation<P, Z: Into<Zome> + Clone>(
keystore: &MetaLairClient,
cell_id: &CellId,
func: &str,
payload: P,
zome: Z,
) -> Result<ZomeCallInvocation, SerializedBytesError>
where
P: serde::Serialize + std::fmt::Debug,
{
let ZomeCall {
cell_id,
cap_secret,
fn_name,
payload,
provenance,
signature,
nonce,
expires_at,
..
} = new_zome_call(keystore, cell_id, func, payload, zome.clone().into()).await?;
Ok(ZomeCallInvocation {
cell_id,
zome: zome.into(),
cap_secret,
fn_name,
payload,
provenance,
signature,
nonce,
expires_at,
})
}
pub fn fake_valid_dna_file(network_seed: &str) -> DnaFile {
fake_dna_zomes(
network_seed,
vec![(TestWasm::Foo.into(), TestWasm::Foo.into())],
)
}
pub async fn fake_genesis(
vault: DbWrite<DbKindAuthored>,
dht_db: DbWrite<DbKindDht>,
keystore: MetaLairClient,
) -> SourceChainResult<()> {
fake_genesis_for_agent(vault, dht_db, fake_agent_pubkey_1(), keystore).await
}
pub async fn fake_genesis_for_agent(
vault: DbWrite<DbKindAuthored>,
dht_db: DbWrite<DbKindDht>,
agent: AgentPubKey,
keystore: MetaLairClient,
) -> SourceChainResult<()> {
let dna = fake_dna_file("cool dna");
let dna_hash = dna.dna_hash().clone();
source_chain::genesis(
vault,
dht_db.clone(),
&DhtDbQueryCache::new(dht_db.clone().into()),
keystore,
dna_hash,
agent,
None,
None,
)
.await
}
pub async fn force_publish_dht_ops(
vault: &DbWrite<DbKindAuthored>,
publish_trigger: &mut TriggerSender,
) -> DatabaseResult<()> {
vault
.write_async(|txn| {
DatabaseResult::Ok(txn.execute(
"UPDATE DhtOp SET last_publish_time = NULL WHERE receipts_complete IS NULL",
[],
)?)
})
.await?;
publish_trigger.trigger(&"force_publish_dht_ops");
Ok(())
}