#![deny(missing_docs)]
#![allow(deprecated)]
pub const WASM_CACHE: &str = "wasm-cache";
pub use self::share::RwShare;
use super::api::error::ConductorApiError;
use super::api::AppInterfaceApi;
use super::config::AdminInterfaceConfig;
use super::config::InterfaceDriver;
use super::entry_def_store::get_entry_defs;
use super::error::ConductorError;
use super::interface::error::InterfaceResult;
use super::interface::websocket::spawn_admin_interface_tasks;
use super::interface::websocket::spawn_app_interface_task;
use super::interface::websocket::spawn_websocket_listener;
use super::manager::TaskManagerResult;
use super::ribosome_store::RibosomeStore;
use super::space::Space;
use super::space::Spaces;
use super::state::AppInterfaceConfig;
use super::state::AppInterfaceId;
use super::state::ConductorState;
use super::CellError;
use super::{api::AdminInterfaceApi, manager::TaskManagerClient};
use crate::conductor::cell::Cell;
use crate::conductor::conductor::app_auth_token_store::AppAuthTokenStore;
use crate::conductor::conductor::app_broadcast::AppBroadcast;
use crate::conductor::config::ConductorConfig;
use crate::conductor::error::ConductorResult;
use crate::core::queue_consumer::InitialQueueTriggers;
use crate::core::queue_consumer::QueueConsumerMap;
#[cfg(any(test, feature = "test_utils"))]
use crate::core::queue_consumer::QueueTriggers;
use crate::core::ribosome::guest_callback::post_commit::PostCommitArgs;
use crate::core::ribosome::guest_callback::post_commit::POST_COMMIT_CHANNEL_BOUND;
use crate::core::ribosome::guest_callback::post_commit::POST_COMMIT_CONCURRENT_LIMIT;
use crate::core::ribosome::real_ribosome::ModuleCacheLock;
use crate::core::ribosome::RibosomeT;
use crate::core::workflow::ZomeCallResult;
use crate::{
conductor::api::error::ConductorApiResult, core::ribosome::real_ribosome::RealRibosome,
};
pub use builder::*;
use futures::future;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use holo_hash::DnaHash;
use holochain_conductor_api::conductor::KeystoreConfig;
use holochain_conductor_api::AppInfo;
use holochain_conductor_api::AppStatusFilter;
use holochain_conductor_api::FullIntegrationStateDump;
use holochain_conductor_api::FullStateDump;
use holochain_conductor_api::IntegrationStateDump;
use holochain_conductor_api::PeerMetaInfo;
use holochain_keystore::lair_keystore::spawn_lair_keystore;
use holochain_keystore::lair_keystore::spawn_lair_keystore_in_proc;
use holochain_keystore::MetaLairClient;
use holochain_p2p::HolochainP2pDnaT;
use holochain_sqlite::sql::sql_cell::state_dump;
use holochain_state::host_fn_workspace::SourceChainWorkspace;
use holochain_state::nonce::witness_nonce;
use holochain_state::nonce::WitnessNonceResult;
use holochain_state::prelude::*;
use holochain_state::source_chain;
pub use holochain_types::share;
#[cfg(feature = "wasmer_sys")]
use holochain_wasmer_host::module::ModuleCache;
use holochain_zome_types::prelude::{AppCapGrantInfo, ClonedCell, Signature, Timestamp};
use indexmap::IndexMap;
use itertools::Itertools;
use kitsune2_api::AgentInfoSigned;
use rusqlite::Transaction;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc::error::SendError;
use tokio::task::JoinHandle;
use tracing::*;
mod builder;
mod app_auth_token_store;
mod hc_p2p_handler_impl;
mod state_dump_helpers;
pub(crate) mod zome_call_signature_verification;
pub(crate) mod app_broadcast;
#[cfg(test)]
pub(crate) mod tests;
pub type ConductorHandle = Arc<Conductor>;
#[allow(dead_code)]
pub(crate) type StopBroadcaster = task_motel::StopBroadcaster;
pub(crate) type StopReceiver = task_motel::StopListener;
pub struct Conductor {
running_cells: RwShare<IndexMap<CellId, Arc<Cell>>>,
pub config: Arc<ConductorConfig>,
pub(crate) spaces: Spaces,
shutting_down: Arc<AtomicBool>,
admin_websocket_ports: RwShare<Vec<u16>>,
task_manager: TaskManagerClient,
pub(crate) outcomes_task: RwShare<Option<JoinHandle<TaskManagerResult>>>,
ribosome_store: RwShare<RibosomeStore>,
keystore: MetaLairClient,
holochain_p2p: holochain_p2p::actor::DynHcP2p,
post_commit: tokio::sync::mpsc::Sender<PostCommitArgs>,
scheduler: Arc<parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>>,
pub(crate) wasmer_module_cache: Option<Arc<ModuleCacheLock>>,
app_auth_token_store: RwShare<AppAuthTokenStore>,
app_broadcast: AppBroadcast,
}
impl std::fmt::Debug for Conductor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Conductor").finish()
}
}
impl Conductor {
pub fn builder() -> ConductorBuilder {
ConductorBuilder::new()
}
}
mod startup_shutdown_impls {
use super::*;
use crate::conductor::manager::{spawn_task_outcome_handler, OutcomeReceiver, OutcomeSender};
use crate::conductor::metrics::register_uptime_metric;
impl Conductor {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
config: Arc<ConductorConfig>,
ribosome_store: RwShare<RibosomeStore>,
keystore: MetaLairClient,
holochain_p2p: holochain_p2p::actor::DynHcP2p,
spaces: Spaces,
post_commit: tokio::sync::mpsc::Sender<PostCommitArgs>,
outcome_sender: OutcomeSender,
) -> Self {
let tracing_scope = config.tracing_scope().unwrap_or_default();
let maybe_data_root_path = config.data_root_path.clone().map(|path| (*path).clone());
if let Some(path) = &maybe_data_root_path {
let mut path = path.clone();
path.push(WASM_CACHE);
let _ = std::fs::create_dir_all(&path);
}
Self {
spaces,
running_cells: RwShare::new(IndexMap::new()),
config,
shutting_down: Arc::new(AtomicBool::new(false)),
task_manager: TaskManagerClient::new(outcome_sender, tracing_scope),
outcomes_task: RwShare::new(None),
admin_websocket_ports: RwShare::new(Vec::new()),
scheduler: Arc::new(parking_lot::Mutex::new(None)),
ribosome_store,
keystore,
holochain_p2p,
post_commit,
#[cfg(feature = "wasmer_sys")]
wasmer_module_cache: Some(Arc::new(ModuleCacheLock::new(ModuleCache::new(
maybe_data_root_path.map(|p| p.join(WASM_CACHE)),
)))),
#[cfg(feature = "wasmer_wamr")]
wasmer_module_cache: None,
app_auth_token_store: RwShare::default(),
app_broadcast: AppBroadcast::default(),
}
}
pub fn check_running(&self) -> ConductorResult<()> {
if self
.shutting_down
.load(std::sync::atomic::Ordering::Relaxed)
{
Err(ConductorError::ShuttingDown)
} else {
Ok(())
}
}
pub fn detach_task_management(&self) -> Option<JoinHandle<TaskManagerResult>> {
self.outcomes_task.share_mut(|tm| tm.take())
}
pub fn shutdown(self: Arc<Self>) -> JoinHandle<TaskManagerResult> {
self.shutting_down
.store(true, std::sync::atomic::Ordering::Relaxed);
let mut tm = self.task_manager();
let task = self.detach_task_management().expect("Attempting to shut down after already detaching task management or previous shutdown");
let self2 = self.clone();
tokio::task::spawn(async move {
let running_cell_ids: Vec<_> = self2
.running_cells
.share_mut(|c| c.keys().cloned().collect());
let remove_cells_task = self2.remove_cells(&running_cell_ids);
tracing::info!("Sending shutdown signal to all managed tasks and removing cells.");
let (.., result) = futures::join!(remove_cells_task, tm.shutdown().boxed(), task,);
result?
})
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all, fields(scope=self.config.network.tracing_scope)))]
pub(crate) async fn initialize_conductor(
self: Arc<Self>,
outcome_rx: OutcomeReceiver,
admin_configs: Vec<AdminInterfaceConfig>,
) -> ConductorResult<()> {
self.load_ribosomes().await?;
info!("Conductor startup: Ribosomes loaded.");
self.outcomes_task.share_mut(|lock| {
if lock.is_some() {
panic!("Cannot start task manager twice");
}
let task = spawn_task_outcome_handler(self.clone(), outcome_rx);
*lock = Some(task);
});
self.clone().add_admin_interfaces(admin_configs).await?;
info!("Conductor startup: admin interface(s) added.");
self.clone().startup_app_interfaces().await?;
info!("Conductor startup: app interfaces started.");
let state = self.get_state().await?;
for (_, app) in state.enabled_apps() {
let config_override = Self::p2p_config_overrides(&app.manifest);
let cell_ids = app.all_enabled_cells();
self.clone()
.create_cells_and_startup(cell_ids, config_override)
.await?;
}
info!("Conductor startup: apps enabled.");
register_uptime_metric(std::time::Instant::now());
Ok(())
}
}
}
mod interface_impls {
use super::*;
use holochain_conductor_api::AppInterfaceInfo;
use holochain_types::websocket::AllowedOrigins;
impl Conductor {
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn add_admin_interfaces(
self: Arc<Self>,
configs: Vec<AdminInterfaceConfig>,
) -> ConductorResult<Vec<u16>> {
let admin_api = AdminInterfaceApi::new(self.clone());
let tm = self.task_manager();
let spawn_from_config = |AdminInterfaceConfig { driver, .. }| {
let admin_api = admin_api.clone();
let tm = tm.clone();
async move {
match driver {
InterfaceDriver::Websocket {
port,
danger_bind_addr,
allowed_origins,
} => {
let listener =
spawn_websocket_listener(port, danger_bind_addr, allowed_origins)
.await?;
let port = listener.local_addrs()?[0].port();
spawn_admin_interface_tasks(
tm.clone(),
listener,
admin_api.clone(),
port,
);
InterfaceResult::Ok(port)
}
}
}
};
let ports: Result<Vec<_>, _> =
future::join_all(configs.into_iter().map(spawn_from_config))
.await
.into_iter()
.collect();
let ports = ports.map_err(Box::new)?;
for p in &ports {
self.add_admin_port(*p);
}
Ok(ports)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn add_app_interface(
self: Arc<Self>,
port: either::Either<u16, AppInterfaceId>,
danger_bind_addr: Option<String>,
allowed_origins: AllowedOrigins,
installed_app_id: Option<InstalledAppId>,
) -> ConductorResult<u16> {
let interface_id = match port {
either::Either::Left(port) => AppInterfaceId::new(port),
either::Either::Right(id) => id,
};
let port = interface_id.port();
debug!("Attaching interface {}", port);
let app_api = AppInterfaceApi::new(self.clone());
let tm = self.task_manager();
let port = spawn_app_interface_task(
tm.clone(),
port,
danger_bind_addr.clone(),
allowed_origins.clone(),
installed_app_id.clone(),
app_api,
self.app_broadcast.clone(),
)
.await
.map_err(Box::new)?;
let config = AppInterfaceConfig::websocket(
port,
danger_bind_addr,
allowed_origins,
installed_app_id,
);
self.update_state(|mut state| {
state.app_interfaces.insert(interface_id, config);
Ok(state)
})
.await?;
debug!("App interface added at port: {}", port);
Ok(port)
}
pub fn get_arbitrary_admin_websocket_port(&self) -> Option<u16> {
self.admin_websocket_ports.share_ref(|p| p.first().copied())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn list_app_interfaces(&self) -> ConductorResult<Vec<AppInterfaceInfo>> {
Ok(self
.get_state()
.await?
.app_interfaces
.values()
.map(|config| AppInterfaceInfo {
port: config.driver.port(),
allowed_origins: config.driver.allowed_origins().clone(),
installed_app_id: config.installed_app_id.clone(),
})
.collect())
}
#[allow(irrefutable_let_patterns)]
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn startup_app_interfaces(self: Arc<Self>) -> ConductorResult<()> {
for (id, config) in &self.get_state().await?.app_interfaces {
debug!("Starting up app interface: {:?}", id);
let _ = self
.clone()
.add_app_interface(
either::Right(id.clone()),
config.driver.danger_bind_addr().cloned(),
config.driver.allowed_origins().clone(),
config.installed_app_id.clone(),
)
.await?;
}
Ok(())
}
}
}
mod dna_impls {
use super::*;
impl Conductor {
pub fn list_dna_hashes(&self) -> HashSet<DnaHash> {
self.ribosome_store().share_ref(|ds| ds.list_dna_hashes())
}
pub fn get_dna_def(&self, cell_id: &CellId) -> Option<DnaDef> {
self.ribosome_store()
.share_ref(|ds| ds.get_dna_def(cell_id))
}
pub fn get_dna_file(&self, cell_id: &CellId) -> Option<DnaFile> {
self.ribosome_store()
.share_ref(|ds| ds.get_dna_file(cell_id))
}
pub fn get_entry_def(&self, key: &EntryDefBufferKey) -> Option<EntryDef> {
self.ribosome_store().share_ref(|ds| ds.get_entry_def(key))
}
pub fn get_dna_definitions(
&self,
app: &InstalledApp,
) -> ConductorResult<IndexMap<CellId, DnaDefHashed>> {
let mut dna_defs = IndexMap::new();
for cell_id in app.all_cells() {
let ribosome = self.get_ribosome(&cell_id)?;
let dna_def_hashed = ribosome.dna_def_hashed();
dna_defs.insert(cell_id.to_owned(), dna_def_hashed.to_owned());
}
Ok(dna_defs)
}
pub(crate) async fn register_dna_wasm(
&self,
cell_id: CellId,
ribosome: RealRibosome,
) -> ConductorResult<Vec<(EntryDefBufferKey, EntryDef)>> {
let is_full_wasm_dna = ribosome
.dna_def_hashed()
.all_zomes()
.all(|(_, zome_def)| matches!(zome_def, ZomeDef::Wasm(_)));
if is_full_wasm_dna {
Ok(self.put_wasm_and_defs(cell_id, ribosome).await?)
} else {
Ok(Vec::with_capacity(0))
}
}
pub(crate) fn register_dna_entry_defs(
&self,
entry_defs: Vec<(EntryDefBufferKey, EntryDef)>,
) {
self.ribosome_store
.share_mut(|d| d.add_entry_defs(entry_defs));
}
pub(crate) fn add_ribosome_to_store(&self, cell_id: CellId, ribosome: RealRibosome) {
self.ribosome_store
.share_mut(|d| d.add_ribosome(cell_id, ribosome));
}
pub(crate) async fn load_wasms_into_dna_files(
&self,
) -> ConductorResult<(
impl IntoIterator<Item = (CellId, RealRibosome)>,
impl IntoIterator<Item = (EntryDefBufferKey, EntryDef)>,
)> {
let state = self.get_state().await?;
let all_cells: Vec<CellId> = state
.installed_apps()
.values()
.flat_map(|app| app.all_cells())
.collect();
let mut dna_defs_with_cell_id = Vec::new();
for cell_id in all_cells {
if let Some(cell_dna_tuple) =
self.spaces.dna_def_store.as_read().get(&cell_id).await?
{
dna_defs_with_cell_id.push(cell_dna_tuple);
}
}
let unique_wasm_hashes = dna_defs_with_cell_id
.iter()
.flat_map(|(_cell_id, dna_def)| {
dna_def
.all_zomes()
.map(|(zome_name, zome)| Ok(zome.wasm_hash(zome_name)?))
})
.collect::<ConductorResult<HashSet<_>>>()?;
let mut wasms_and_hashes = HashMap::new();
for wasm_hash in unique_wasm_hashes {
let wasm_hashed = self
.spaces
.wasm_store
.as_read()
.get(&wasm_hash)
.await?
.ok_or(ConductorError::WasmMissing)?;
wasms_and_hashes.insert(wasm_hash, wasm_hashed.into_content());
}
let dna_defs_with_wasms = dna_defs_with_cell_id
.into_iter()
.map(|(cell_id, dna_def)| {
let wasms = dna_def.all_zomes().filter_map(|(zome_name, zome)| {
let wasm_hash = zome.wasm_hash(zome_name).ok()?;
wasms_and_hashes.get(&wasm_hash).cloned()
});
let wasms = wasms.collect::<Vec<_>>();
((cell_id, dna_def), wasms)
})
.collect::<Vec<_>>();
let entry_defs = self.spaces.entry_def_store.as_read().get_all().await?;
let ribosomes_with_cell_id_future =
dna_defs_with_wasms
.into_iter()
.map(|((cell_id, dna_def), wasms)| async move {
let dna_file = DnaFile::new(dna_def, wasms).await;
#[cfg(feature = "wasmer_sys")]
let ribosome =
RealRibosome::new(dna_file, self.wasmer_module_cache.clone()).await?;
#[cfg(feature = "wasmer_wamr")]
let ribosome = RealRibosome::new(dna_file, None).await?;
ConductorResult::Ok((cell_id, ribosome))
});
let ribosomes_with_cell_id =
futures::future::try_join_all(ribosomes_with_cell_id_future).await?;
Ok((ribosomes_with_cell_id, entry_defs))
}
pub fn root_db_dir(&self) -> &PathBuf {
&self.spaces.db_dir
}
pub fn keystore(&self) -> &MetaLairClient {
&self.keystore
}
pub fn holochain_p2p(&self) -> &holochain_p2p::actor::DynHcP2p {
&self.holochain_p2p
}
pub(crate) async fn remove_cells(&self, cell_ids: &[CellId]) {
let cells = self.running_cells.share_mut(|c| -> Vec<_> {
cell_ids
.iter()
.filter_map(|cell_id| c.remove(cell_id))
.collect()
});
future::join_all(cells.into_iter().map(|cell| async move {
if let Err(err) = cell.cleanup().await {
tracing::error!("Error cleaning up Cell: {:?}\nCellId: {}", err, cell.id());
}
}))
.await;
}
pub(crate) async fn put_wasm_and_defs(
&self,
cell_id: CellId,
ribosome: RealRibosome,
) -> ConductorResult<Vec<(EntryDefBufferKey, EntryDef)>> {
let dna_def = ribosome.dna_def_hashed().clone();
let code = ribosome.dna_file().code().clone().into_values();
let zome_defs = get_entry_defs(ribosome).await?;
self.put_code_and_defs_in_databases(cell_id, dna_def, code, zome_defs)
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn put_code_and_defs_in_databases(
&self,
cell_id: CellId,
dna_def_hashed: DnaDefHashed,
code: impl Iterator<Item = wasm::DnaWasm>,
zome_defs: Vec<(EntryDefBufferKey, EntryDef)>,
) -> ConductorResult<Vec<(EntryDefBufferKey, EntryDef)>> {
let wasms = futures::future::join_all(code.map(DnaWasmHashed::from_content)).await;
let wasm_read = self.spaces.wasm_store.as_read();
for wasm in wasms {
if !wasm_read.contains(wasm.as_hash()).await? {
self.spaces.wasm_store.put(wasm).await?;
}
}
for (key, entry_def) in zome_defs.clone() {
self.spaces.entry_def_store.put(key, &entry_def).await?;
}
self.spaces
.dna_def_store
.put(&cell_id, &dna_def_hashed.into_content())
.await?;
Ok(zome_defs)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn load_ribosomes(&self) -> ConductorResult<()> {
let (ribosomes, entry_defs) = self.load_wasms_into_dna_files().await?;
self.ribosome_store().share_mut(|ds| {
ds.add_ribosomes(ribosomes);
ds.add_entry_defs(entry_defs);
});
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn register_dna_file(
&self,
cell_id: CellId,
dna_file: DnaFile,
) -> ConductorResult<()> {
if self.get_ribosome(&cell_id).is_ok() {
return Ok(());
}
let ribosome = RealRibosome::new(dna_file, self.wasmer_module_cache.clone()).await?;
let entry_defs = self
.register_dna_wasm(cell_id.clone(), ribosome.clone())
.await?;
self.register_dna_entry_defs(entry_defs);
self.add_ribosome_to_store(cell_id, ribosome);
Ok(())
}
}
}
mod network_impls {
use super::*;
use crate::conductor::api::error::{
zome_call_response_to_conductor_api_result, ConductorApiError,
};
use futures::future::join_all;
use holochain_conductor_api::ZomeCallParamsSigned;
use holochain_conductor_api::{DnaStorageInfo, StorageBlob, StorageInfo};
use holochain_sqlite::helpers::BytesSql;
use holochain_sqlite::sql::sql_peer_meta_store;
use holochain_sqlite::stats::{get_size_on_disk, get_used_size};
use holochain_zome_types::block::Block;
use holochain_zome_types::block::BlockTargetId;
use kitsune2_api::Url;
use zome_call_signature_verification::is_valid_signature;
impl Conductor {
pub async fn get_agent_infos(
&self,
maybe_dna_hashes: Option<Vec<DnaHash>>,
) -> ConductorApiResult<Vec<Arc<AgentInfoSigned>>> {
let dna_hashes = match maybe_dna_hashes {
Some(hashes) => hashes,
None => self
.spaces
.get_from_spaces(|space| (*space.dna_hash).clone()),
};
let mut out = HashSet::new();
for dna_hash in dna_hashes {
let peer_store = self
.holochain_p2p
.peer_store(dna_hash.clone())
.await
.map_err(|err| ConductorApiError::Other(err.into()))?;
let all_peers = peer_store.get_all().await?;
out.extend(all_peers);
}
Ok(out.into_iter().collect())
}
pub async fn get_app_agent_infos(
&self,
installed_app_id: &InstalledAppId,
maybe_dna_hashes: Option<Vec<DnaHash>>,
) -> ConductorApiResult<Vec<Arc<AgentInfoSigned>>> {
let mut app_dnas = self.get_dna_hashes_for_app(installed_app_id).await?;
if let Some(dna_hashes) = maybe_dna_hashes {
app_dnas.retain(|h| dna_hashes.contains(h));
};
self.get_agent_infos(Some(app_dnas)).await
}
pub async fn app_peer_meta_info(
&self,
installed_app_id: &InstalledAppId,
url: Url,
maybe_dna_hashes: Option<Vec<DnaHash>>,
) -> ConductorApiResult<BTreeMap<DnaHash, BTreeMap<String, PeerMetaInfo>>> {
let mut app_hashes = self.get_dna_hashes_for_app(installed_app_id).await?;
if let Some(dna_hashes) = maybe_dna_hashes {
app_hashes.retain(|h| dna_hashes.contains(h));
}
self.peer_meta_info(url, Some(app_hashes)).await
}
pub async fn peer_meta_info(
&self,
url: Url,
maybe_dna_hashes: Option<Vec<DnaHash>>,
) -> ConductorApiResult<BTreeMap<DnaHash, BTreeMap<String, PeerMetaInfo>>> {
let mut space_ids = self
.spaces
.get_from_spaces(|space| (*space.dna_hash).clone());
if let Some(dna_hashes) = maybe_dna_hashes {
space_ids.retain(|dna_hash| dna_hashes.contains(dna_hash));
}
if space_ids.is_empty() {
return Err(ConductorApiError::Other(
"No cell found for the provided dna hashes.".into(),
));
}
let mut all_infos = BTreeMap::new();
for dna_hash in space_ids {
let db = self.spaces.peer_meta_store_db(&dna_hash)?;
let url2 = url.clone();
let infos = db
.read_async(
move |txn| -> DatabaseResult<BTreeMap<String, PeerMetaInfo>> {
let mut infos: BTreeMap<String, PeerMetaInfo> = BTreeMap::new();
let mut stmt = txn.prepare(sql_peer_meta_store::GET_ALL_BY_URL)?;
let mut rows = stmt.query(named_params! {
":peer_url": url2.as_str()
})?;
while let Some(row) = rows.next()? {
let meta_key = row.get::<_, String>(0)?;
let meta_value: serde_json::Value =
serde_json::from_slice(&(row.get::<_, BytesSql>(1)?.0))
.map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
2,
rusqlite::types::Type::Blob,
e.into(),
)
})?;
let expires_at = row.get::<_, Option<i64>>(2)?;
let peer_meta_info = PeerMetaInfo {
meta_value,
expires_at: expires_at.map(Timestamp),
};
infos.insert(meta_key, peer_meta_info);
}
Ok(infos)
},
)
.await?;
all_infos.insert(dna_hash, infos);
}
Ok(all_infos)
}
pub(crate) async fn witness_nonce_from_calling_agent(
&self,
agent: AgentPubKey,
nonce: Nonce256Bits,
expires: Timestamp,
) -> ConductorResult<WitnessNonceResult> {
Ok(witness_nonce(
&self.spaces.conductor_db,
agent,
nonce,
Timestamp::now(),
expires,
)
.await?)
}
pub async fn unblock(&self, input: Block) -> DatabaseResult<()> {
self.spaces.unblock(input).await
}
pub async fn is_blocked(
&self,
input: BlockTargetId,
timestamp: Timestamp,
) -> ConductorResult<bool> {
self.spaces
.is_blocked(input, timestamp, self.holochain_p2p.clone())
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn storage_info(&self) -> ConductorResult<StorageInfo> {
let state = self.get_state().await?;
let all_dna: HashMap<DnaHash, Vec<InstalledAppId>> = HashMap::new();
let all_dna =
state
.installed_apps()
.iter()
.fold(all_dna, |mut acc, (installed_app_id, app)| {
for cell_id in app.all_cells() {
acc.entry(cell_id.dna_hash().clone())
.or_default()
.push(installed_app_id.clone());
}
acc
});
let app_data_blobs =
futures::future::join_all(all_dna.iter().map(|(dna_hash, used_by)| async {
self.storage_info_for_dna(dna_hash, used_by).await
}))
.await
.into_iter()
.collect::<Result<Vec<StorageBlob>, ConductorError>>()?;
Ok(StorageInfo {
blobs: app_data_blobs,
})
}
async fn storage_info_for_dna(
&self,
dna_hash: &DnaHash,
used_by: &[InstalledAppId],
) -> ConductorResult<StorageBlob> {
let authored_dbs = self.spaces.get_all_authored_dbs(dna_hash)?;
let dht_db = self.spaces.dht_db(dna_hash)?;
let cache_db = self.spaces.cache(dna_hash)?;
Ok(StorageBlob::Dna(DnaStorageInfo {
authored_data_size_on_disk: join_all(
authored_dbs
.iter()
.map(|db| db.read_async(get_size_on_disk)),
)
.await
.into_iter()
.map(|r| r.map_err(ConductorError::DatabaseError))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.sum(),
authored_data_size: join_all(
authored_dbs.iter().map(|db| db.read_async(get_used_size)),
)
.await
.into_iter()
.map(|r| r.map_err(ConductorError::DatabaseError))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.sum(),
dht_data_size_on_disk: dht_db
.read_async(get_size_on_disk)
.map_err(ConductorError::DatabaseError)
.await?,
dht_data_size: dht_db
.read_async(get_used_size)
.map_err(ConductorError::DatabaseError)
.await?,
cache_data_size_on_disk: cache_db
.read_async(get_size_on_disk)
.map_err(ConductorError::DatabaseError)
.await?,
cache_data_size: cache_db
.read_async(get_used_size)
.map_err(ConductorError::DatabaseError)
.await?,
dna_hash: dna_hash.clone(),
used_by: used_by.to_vec(),
}))
}
pub async fn list_wasm_host_functions(&self) -> ConductorApiResult<Vec<String>> {
Ok(RealRibosome::tooling_imports().await?)
}
pub async fn handle_external_zome_call(
&self,
zome_call_params_signed: ZomeCallParamsSigned,
) -> ConductorApiResult<ZomeCallResult> {
let zome_call_params = zome_call_params_signed
.bytes
.clone()
.decode::<ZomeCallParams>()
.map_err(|e| ConductorApiError::SerializationError(e.into()))?;
if !is_valid_signature(
&zome_call_params.provenance,
zome_call_params_signed.bytes.as_bytes(),
&zome_call_params_signed.signature,
)
.await?
{
return Ok(Ok(ZomeCallResponse::AuthenticationFailed(
zome_call_params_signed.signature,
zome_call_params.provenance,
)));
}
self.call_zome(zome_call_params.clone()).await
}
pub async fn call_zome(
&self,
params: ZomeCallParams,
) -> ConductorApiResult<ZomeCallResult> {
let cell = self.cell_by_id(¶ms.cell_id).await?;
Ok(cell.call_zome(params, None).await?)
}
pub(crate) async fn call_zome_with_workspace(
&self,
params: ZomeCallParams,
workspace_lock: SourceChainWorkspace,
) -> ConductorApiResult<ZomeCallResult> {
debug!(cell_id = ?params.cell_id);
let cell = self.cell_by_id(¶ms.cell_id).await?;
Ok(cell.call_zome(params, Some(workspace_lock)).await?)
}
pub async fn easy_call_zome<I, O, Z>(
&self,
provenance: &AgentPubKey,
cap_secret: Option<CapSecret>,
cell_id: CellId,
zome_name: Z,
fn_name: impl Into<FunctionName>,
payload: I,
) -> ConductorApiResult<O>
where
ZomeName: From<Z>,
I: Serialize + std::fmt::Debug,
O: serde::de::DeserializeOwned + std::fmt::Debug,
{
let payload = ExternIO::encode(payload).expect("Couldn't serialize payload");
let now = Timestamp::now();
let (nonce, expires_at) =
holochain_nonce::fresh_nonce(now).map_err(ConductorApiError::Other)?;
let call_params = ZomeCallParams {
cell_id,
zome_name: zome_name.into(),
fn_name: fn_name.into(),
cap_secret,
provenance: provenance.clone(),
payload,
nonce,
expires_at,
};
let response = self.call_zome(call_params).await;
match response {
Ok(Ok(response)) => Ok(zome_call_response_to_conductor_api_result(response)?),
Ok(Err(error)) => Err(ConductorApiError::Other(Box::new(error))),
Err(error) => Err(error),
}
}
}
}
#[derive(Default)]
pub struct InstallAppCommonFlags {
pub defer_memproofs: bool,
pub ignore_genesis_failure: bool,
}
mod app_impls {
use super::*;
use holochain_conductor_api::CellInfo;
impl Conductor {
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
#[cfg(feature = "test_utils")]
pub(crate) async fn install_app_minimal(
self: Arc<Self>,
installed_app_id: InstalledAppId,
agent: Option<AgentPubKey>,
data: &[(impl DnaWithRole, Option<MembraneProof>)],
network_seed: Option<NetworkSeed>,
flags: Option<InstallAppCommonFlags>,
) -> ConductorResult<AgentPubKey> {
let dnas_with_roles: Vec<_> = data.iter().map(|(dr, _)| dr).cloned().collect();
let manifest = app_manifest_from_dnas(&dnas_with_roles, 255, false, network_seed);
self.install_app_with_manifest(installed_app_id, agent, data, flags, manifest)
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
#[cfg(feature = "test_utils")]
pub(crate) async fn install_app_with_manifest(
self: Arc<Self>,
installed_app_id: InstalledAppId,
agent: Option<AgentPubKey>,
data: &[(impl DnaWithRole, Option<MembraneProof>)],
flags: Option<InstallAppCommonFlags>,
manifest: AppManifest,
) -> ConductorResult<AgentPubKey> {
let (dnas_to_register, role_assignments): (Vec<_>, Vec<_>) = data
.iter()
.map(|(dr, mp)| {
let dna = dr.dna().clone();
let dna_hash = dna.dna_hash().clone();
let dnas_to_register = (dna, mp.clone());
let role_assignments =
(dr.role(), AppRolePrimary::new(dna_hash, true, 255).into());
(dnas_to_register, role_assignments)
})
.unzip();
let ops = AppRoleResolution {
dnas_to_register,
role_assignments,
};
let app = self
.install_app_common(
installed_app_id,
manifest,
agent.clone(),
ops,
flags.unwrap_or(InstallAppCommonFlags {
defer_memproofs: false,
ignore_genesis_failure: false,
}),
)
.await?;
Ok(app.agent_key().clone())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
async fn install_app_common(
self: Arc<Self>,
installed_app_id: InstalledAppId,
manifest: AppManifest,
agent_key: Option<AgentPubKey>,
ops: AppRoleResolution,
flags: InstallAppCommonFlags,
) -> ConductorResult<InstalledApp> {
let agent_key = match agent_key {
Some(key) => key,
None => {
self.keystore.new_sign_keypair_random().await?
}
};
let cells_to_create = ops.cells_to_create(agent_key.clone());
let state = self.get_state().await?;
let all_cells: HashSet<_> = state
.installed_apps()
.values()
.flat_map(|app| app.all_cells())
.collect();
let maybe_duplicate_cell_id = cells_to_create
.iter()
.find(|(cell_id, _)| all_cells.contains(cell_id));
if let Some((duplicate_cell_id, _)) = maybe_duplicate_cell_id {
return Err(ConductorError::CellAlreadyExists(
duplicate_cell_id.to_owned(),
));
};
for (dna, _) in ops.dnas_to_register {
let cell_id = CellId::new(dna.dna_hash().clone(), agent_key.clone());
self.clone().register_dna_file(cell_id, dna).await?;
}
if flags.defer_memproofs {
let roles = ops.role_assignments;
let app = InstalledAppCommon::new(
installed_app_id.clone(),
agent_key.clone(),
roles,
manifest,
Timestamp::now(),
)?;
let (_, app) = self
.update_state_prime(move |mut state| {
let app = state.add_app_awaiting_memproofs(app)?;
Ok((state, app))
})
.await?;
Ok(app)
} else {
let genesis_result =
crate::conductor::conductor::genesis_cells(self.clone(), cells_to_create).await;
if genesis_result.is_ok() || flags.ignore_genesis_failure {
let roles = ops.role_assignments;
let app = InstalledAppCommon::new(
installed_app_id.clone(),
agent_key.clone(),
roles,
manifest,
Timestamp::now(),
)?;
let disabled_app = self.add_disabled_app_to_db(app).await?;
genesis_result.map(|_| disabled_app)
} else if let Err(err) = genesis_result {
Err(err)
} else {
unreachable!()
}
}
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn install_app_bundle(
self: Arc<Self>,
payload: InstallAppPayload,
) -> ConductorResult<InstalledApp> {
let InstallAppPayload {
source,
agent_key,
installed_app_id,
network_seed,
roles_settings,
ignore_genesis_failure,
} = payload;
let modifiers = get_modifiers_map_from_role_settings(&roles_settings);
let membrane_proofs = get_memproof_map_from_role_settings(&roles_settings);
let existing_cells = get_existing_cells_map_from_role_settings(&roles_settings);
let bundle = {
let original_bundle = source.resolve().await?;
let mut manifest = original_bundle.manifest().to_owned();
if let Some(network_seed) = network_seed {
manifest.set_network_seed(network_seed);
}
manifest.override_modifiers(modifiers)?;
AppBundle::from(original_bundle.into_inner().update_manifest(manifest)?)
};
let manifest = bundle.manifest().clone();
let defer_memproofs = match &manifest {
AppManifest::V0(m) => m.allow_deferred_memproofs && membrane_proofs.is_empty(),
};
let flags = InstallAppCommonFlags {
defer_memproofs,
ignore_genesis_failure,
};
let installed_app_id =
installed_app_id.unwrap_or_else(|| manifest.app_name().to_owned());
let ops = bundle
.resolve_cells(membrane_proofs, existing_cells)
.await?;
self.clone()
.install_app_common(installed_app_id, manifest, agent_key, ops, flags)
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self)))]
pub async fn uninstall_app(
self: Arc<Self>,
installed_app_id: &InstalledAppId,
force: bool,
) -> ConductorResult<()> {
let state = self.get_state().await?;
let deps = state.get_dependent_apps(installed_app_id, true)?;
if force || deps.is_empty() {
let app = state.get_app(installed_app_id)?;
let cells_to_remove = app.all_cells().collect::<Vec<_>>();
self.delete_cell_databases(app.id(), cells_to_remove.clone())
.await?;
self.remove_app_from_db(installed_app_id).await?;
tracing::debug!(msg = "Removed app from db.", app = ?app);
self.remove_cells(&cells_to_remove).await;
let installed_app_ids = self
.get_state()
.await?
.installed_apps()
.iter()
.map(|(app_id, _)| app_id.clone())
.collect::<HashSet<_>>();
self.app_broadcast.retain(installed_app_ids);
Ok(())
} else {
Err(ConductorError::AppHasDependents(
installed_app_id.clone(),
deps,
))
}
}
pub async fn list_enabled_apps(&self) -> ConductorResult<Vec<InstalledAppId>> {
let state = self.get_state().await?;
Ok(state.enabled_apps().map(|(id, _)| id).cloned().collect())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn list_apps(
&self,
status_filter: Option<AppStatusFilter>,
) -> ConductorResult<Vec<AppInfo>> {
use AppStatusFilter::*;
let conductor_state = self.get_state().await?;
let apps_ids: Vec<&String> = match status_filter {
Some(Enabled) => conductor_state.enabled_apps().map(|(id, _)| id).collect(),
Some(Disabled) => conductor_state.disabled_apps().map(|(id, _)| id).collect(),
None => conductor_state.installed_apps().keys().collect(),
};
let mut app_infos: Vec<AppInfo> = apps_ids
.into_iter()
.map(|app_id| self.get_app_info_inner(app_id, &conductor_state))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect();
app_infos.sort_by_key(|app_info| std::cmp::Reverse(app_info.installed_at));
Ok(app_infos)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn list_enabled_apps_for_dependent_cell_id(
&self,
cell_id: &CellId,
) -> ConductorResult<HashSet<InstalledAppId>> {
Ok(self
.get_state()
.await?
.enabled_apps()
.filter(|(_, v)| v.all_cells().any(|i| i == *cell_id))
.map(|(k, _)| k)
.cloned()
.collect())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn find_cell_with_role_alongside_cell(
&self,
cell_id: &CellId,
role_name: &RoleName,
) -> ConductorResult<Option<CellId>> {
Ok(self
.get_state()
.await?
.enabled_apps()
.find(|(_, enabled_app)| enabled_app.all_cells().any(|i| i == *cell_id))
.and_then(|(_, enabled_app)| {
enabled_app.role(role_name).ok().map(|role| match role {
AppRoleAssignment::Primary(primary) => {
CellId::new(primary.dna_hash().clone(), enabled_app.agent_key().clone())
}
AppRoleAssignment::Dependency(dependency) => dependency.cell_id.clone(),
})
}))
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn list_enabled_apps_for_dependent_dna_hash(
&self,
dna_hash: &DnaHash,
) -> ConductorResult<HashSet<InstalledAppId>> {
Ok(self
.get_state()
.await?
.enabled_apps()
.filter(|(_, v)| v.all_cells().any(|i| i.dna_hash() == dna_hash))
.map(|(k, _)| k)
.cloned()
.collect())
}
pub async fn get_app_info(
&self,
installed_app_id: &InstalledAppId,
) -> ConductorResult<Option<AppInfo>> {
let state = self.get_state().await?;
let maybe_app_info = self.get_app_info_inner(installed_app_id, &state)?;
Ok(maybe_app_info)
}
pub async fn provide_memproofs(
self: Arc<Self>,
installed_app_id: &InstalledAppId,
mut memproofs: MemproofMap,
) -> ConductorResult<()> {
let state = self.get_state().await?;
let app = state.get_app(installed_app_id)?;
let cells_to_genesis = app
.primary_roles()
.map(|(role_name, role)| {
(
CellId::new(role.dna_hash().clone(), app.agent_key.clone()),
memproofs.remove(role_name),
)
})
.collect();
crate::conductor::conductor::genesis_cells(self.clone(), cells_to_genesis).await?;
self.update_state({
let installed_app_id = installed_app_id.clone();
move |mut state| {
let app = state.get_app_mut(&installed_app_id)?;
app.status =
AppStatus::Disabled(DisabledAppReason::NotStartedAfterProvidingMemproofs);
Ok(state)
}
})
.await?;
Ok(())
}
fn get_app_info_inner(
&self,
app_id: &InstalledAppId,
state: &ConductorState,
) -> ConductorResult<Option<AppInfo>> {
match state.get_app(app_id) {
Err(_) => Ok(None),
Ok(app) => {
let dna_definitions = self.get_dna_definitions(app)?;
Ok(Some(AppInfo::from_installed_app(app, &dna_definitions)))
}
}
}
pub(crate) async fn get_dna_hashes_for_app(
&self,
installed_app_id: &InstalledAppId,
) -> ConductorResult<Vec<DnaHash>> {
let app_info = self.get_app_info(installed_app_id).await?.ok_or_else(|| {
ConductorError::other(format!("App not installed: {installed_app_id}"))
})?;
let mut app_dnas: HashSet<DnaHash> = HashSet::new();
for cell_infos in app_info.cell_info.values() {
for cell_info in cell_infos {
let dna = match cell_info {
CellInfo::Provisioned(cell) => cell.cell_id.dna_hash().clone(),
CellInfo::Cloned(cell) => cell.cell_id.dna_hash().clone(),
CellInfo::Stem(cell) => cell.original_dna_hash.clone(),
};
app_dnas.insert(dna);
}
}
Ok(app_dnas.into_iter().collect())
}
}
}
mod cell_impls {
use super::*;
impl Conductor {
pub(crate) async fn cell_by_id(&self, cell_id: &CellId) -> ConductorResult<Arc<Cell>> {
if let Some(cell) = self.running_cells.share_ref(|c| c.get(cell_id).cloned()) {
Ok(cell)
} else {
let present = self
.get_state()
.await?
.installed_apps()
.values()
.flat_map(|app| app.all_cells())
.any(|id| id == *cell_id);
if present {
Err(ConductorError::CellDisabled(cell_id.clone()))
} else {
Err(ConductorError::CellMissing(cell_id.clone()))
}
}
}
pub fn running_cell_ids(&self) -> HashSet<CellId> {
self.running_cells
.share_ref(|cells| cells.keys().cloned().collect())
}
#[cfg(feature = "unstable-migration")]
pub async fn cells_by_dna_lineage(
&self,
dna_hash: &DnaHash,
) -> ConductorResult<holochain_conductor_api::CompatibleCells> {
use std::collections::BTreeSet;
Ok(self
.get_state()
.await?
.installed_apps()
.values()
.filter_map(|app| {
let cells_in_lineage: BTreeSet<_> = app
.all_cells()
.filter_map(|cell_id| {
let cell_dna_hash = cell_id.dna_hash();
if cell_dna_hash == dna_hash {
Some(cell_id.clone())
} else {
self.get_dna_def(&cell_id)
.map(|dna_def| dna_def.lineage.contains(dna_hash))
.unwrap_or(false)
.then(|| cell_id.clone())
}
})
.collect();
if cells_in_lineage.is_empty() {
None
} else {
Some((app.installed_app_id.clone(), cells_in_lineage))
}
})
.collect())
}
}
}
mod clone_cell_impls {
use super::*;
use holochain_zome_types::prelude::ClonedCell;
impl Conductor {
pub async fn create_clone_cell(
self: Arc<Self>,
installed_app_id: &InstalledAppId,
payload: CreateCloneCellPayload,
) -> ConductorResult<ClonedCell> {
let CreateCloneCellPayload {
role_name,
modifiers,
membrane_proof,
name,
} = payload;
if !modifiers.has_some_option_set() {
return Err(ConductorError::CloneCellError(
"neither network_seed nor properties provided for clone cell".to_string(),
));
}
let state = self.get_state().await?;
let app = state.get_app(installed_app_id)?;
let app_role = app.primary_role(&role_name)?;
if app_role.is_provisioned {
let source_chain = SourceChain::new(
self.get_or_create_authored_db(app_role.dna_hash(), app.agent_key().clone())?,
self.get_or_create_dht_db(app_role.dna_hash())?,
self.keystore.clone(),
app.agent_key().clone(),
)
.await?;
source_chain.valid_create_agent_key_action().await?;
}
let clone_cell = self
.add_clone_cell_to_app(
installed_app_id.clone(),
app.agent_key.clone(),
role_name.clone(),
modifiers.serialized()?,
name,
)
.await?;
let cells = vec![(clone_cell.cell_id.clone(), membrane_proof)];
crate::conductor::conductor::genesis_cells(self.clone(), cells).await?;
let state = self.get_state().await?;
let app = state.get_app(installed_app_id)?;
let p2p_config_override = Self::p2p_config_overrides(&app.manifest);
self.create_cells_and_startup(
[clone_cell.cell_id.clone()].into_iter(),
p2p_config_override,
)
.await?;
Ok(clone_cell)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn disable_clone_cell(
&self,
installed_app_id: &InstalledAppId,
DisableCloneCellPayload { clone_cell_id }: &DisableCloneCellPayload,
) -> ConductorResult<()> {
let (_, removed_cell_id) = self
.update_state_prime({
let app_id = installed_app_id.clone();
let clone_cell_id = clone_cell_id.to_owned();
move |mut state| {
let app = state.get_app_mut(&app_id)?;
let clone_id = app.get_clone_id(&clone_cell_id)?;
let dna_hash = app.get_clone_dna_hash(&clone_cell_id)?;
app.disable_clone_cell(&clone_id)?;
let cell_id = CellId::new(dna_hash, app.agent_key().clone());
Ok((state, cell_id))
}
})
.await?;
self.remove_cells(&[removed_cell_id]).await;
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn enable_clone_cell(
self: Arc<Self>,
installed_app_id: &InstalledAppId,
payload: &EnableCloneCellPayload,
) -> ConductorResult<ClonedCell> {
let conductor = self.clone();
let (_, enabled_cell) = self
.update_state_prime({
let app_id = installed_app_id.clone();
let clone_cell_id = payload.clone_cell_id.to_owned();
move |mut state| {
let app = state.get_app_mut(&app_id)?;
let clone_id = app.get_clone_id(&clone_cell_id)?;
let (cell_id, _) = app.enable_clone_cell(&clone_id)?.into_inner();
let app_role = app.primary_role(&clone_id.as_base_role_name())?;
let original_dna_hash = app_role.dna_hash().clone();
let ribosome = conductor.get_ribosome(&cell_id)?;
let dna_def = ribosome.dna_file.dna_def();
let dna_modifiers = dna_def.modifiers.clone();
let name = dna_def.name.clone();
let enabled_cell = ClonedCell {
cell_id,
clone_id,
original_dna_hash,
dna_modifiers,
name,
enabled: true,
};
Ok((state, enabled_cell))
}
})
.await?;
let state = self.get_state().await?;
let app = state.get_app(installed_app_id)?;
let p2p_config_override = Self::p2p_config_overrides(&app.manifest);
self.create_cells_and_startup(
[enabled_cell.cell_id.clone()].into_iter(),
p2p_config_override,
)
.await?;
Ok(enabled_cell)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn delete_clone_cell(
&self,
DeleteCloneCellPayload {
app_id,
clone_cell_id,
}: &DeleteCloneCellPayload,
) -> ConductorResult<()> {
let (_, cell_id) = self
.update_state_prime({
let app_id = app_id.clone();
let clone_cell_id = clone_cell_id.clone();
move |mut state| {
let app = state.get_app_mut(&app_id)?;
let cell_id = app
.disabled_clone_cells()
.find(|(id, cell_id)| match &clone_cell_id {
CloneCellId::CloneId(clone_id) => *id == clone_id,
CloneCellId::DnaHash(dna_hash) => cell_id.dna_hash() == dna_hash,
})
.expect("disabled clone cell not part of this app")
.1;
let clone_id = app.get_clone_id(&clone_cell_id)?;
app.delete_clone_cell(&clone_id)?;
Ok((state, cell_id))
}
})
.await?;
self.delete_cell_databases(app_id, vec![cell_id]).await?;
Ok(())
}
}
}
mod app_status_impls {
use super::*;
use crate::conductor::cell::error::CellResult;
use holochain_types::cell_config_overrides::CellConfigOverrides;
impl Conductor {
pub(crate) async fn create_cells_and_startup(
self: Arc<Self>,
cell_ids: impl Iterator<Item = CellId>,
config_override: Option<CellConfigOverrides>,
) -> ConductorResult<()> {
let cells_to_create = cell_ids.map(|cell_id| {
let handle = self.clone();
let overrides = config_override.clone();
async move { handle.clone().create_cell(&cell_id, overrides).await }
});
let cells = futures::stream::iter(cells_to_create)
.buffer_unordered(5)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let (new_cells, triggers): (Vec<_>, Vec<_>) = cells.into_iter().unzip();
let new_cells_map: IndexMap<CellId, Arc<Cell>> = new_cells
.into_iter()
.map(|c| (c.id().clone(), Arc::new(c)))
.collect();
let new_cell_ids: HashSet<_> = new_cells_map.keys().cloned().collect();
let join_futures = new_cells_map
.values()
.enumerate()
.map(|(i, cell)| {
let config_override = config_override.clone();
let cell_id = cell.id().clone();
let agent_pubkey = cell_id.agent_pubkey().clone();
let holochain_p2p_dna = cell.holochain_p2p_dna().clone();
async move {
if let Err(e) = holochain_p2p_dna
.join(agent_pubkey, None, config_override)
.await
{
tracing::error!(?e, ?cell_id, "Network join failed.");
}
}
.instrument(tracing::info_span!("network join task", ?i))
})
.collect::<Vec<_>>();
futures::stream::iter(join_futures)
.buffer_unordered(10)
.collect::<Vec<_>>()
.await;
self.running_cells.share_mut(|cells| {
cells.extend(new_cells_map.clone());
tracing::debug!(?new_cell_ids, "added cells to running_cells");
});
for trigger in triggers {
trigger.initialize_workflows();
}
Ok(())
}
async fn create_cell(
self: Arc<Self>,
cell_id: &CellId,
overrides: Option<CellConfigOverrides>,
) -> CellResult<(Cell, InitialQueueTriggers)> {
let overrides = overrides.unwrap_or_default();
if let Some(conflicting_cell_id) =
self.check_p2p_overrides_conflicting(cell_id.dna_hash(), &overrides)
{
let conflicting_app_id = self
.find_app_containing_cell(&conflicting_cell_id)
.await
.ok()
.flatten()
.map(|app| Box::new(app.installed_app_id.clone()));
let app_id = self
.find_app_containing_cell(cell_id)
.await
.ok()
.flatten()
.map(|app| Box::new(app.installed_app_id.clone()));
return Err(CellError::P2pConfigOverridesConflict {
app_id,
cell_id: cell_id.clone(),
conflicting_app_id,
conflicting_cell_id,
});
}
let holochain_p2p_cell = holochain_p2p::HolochainP2pDna::new(
self.holochain_p2p.clone(),
cell_id.dna_hash().clone(),
);
let space = self
.get_or_create_space(cell_id.dna_hash())
.map_err(|e| CellError::FailedToCreateDnaSpace(ConductorError::from(e).into()))?;
let signal_tx = self
.get_signal_tx(cell_id)
.await
.map_err(|err| CellError::ConductorError(Box::new(err)))?;
tracing::info!(?cell_id, "Creating a cell");
Cell::create(
cell_id.clone(),
self.clone(),
space,
holochain_p2p_cell,
signal_tx,
overrides,
)
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self)))]
pub async fn enable_app(
self: Arc<Self>,
app_id: InstalledAppId,
) -> ConductorResult<InstalledApp> {
let state = self.clone().get_state().await?;
let app = state.get_app(&app_id)?;
if app.status == AppStatus::AwaitingMemproofs {
return Err(ConductorError::AppStatusError(
"App is awaiting membrane proofs and cannot be enabled.".to_string(),
));
}
if app.status == AppStatus::Enabled {
return Ok(app.clone());
}
let config_override = Self::p2p_config_overrides(&app.manifest);
let cell_ids_in_app = app.all_enabled_cells();
self.clone()
.create_cells_and_startup(cell_ids_in_app, config_override)
.await?;
let (_, app) = self
.update_state_prime(move |mut state| {
let app = state.get_app_mut(&app_id)?;
app.status = AppStatus::Enabled;
let app = app.clone();
Ok((state, app))
})
.await?;
Ok(app)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self)))]
pub async fn disable_app(
self: Arc<Self>,
app_id: InstalledAppId,
reason: DisabledAppReason,
) -> ConductorResult<InstalledApp> {
let state = self.clone().get_state().await?;
let app = state.get_app(&app_id)?;
if matches!(app.status, AppStatus::Disabled(_)) {
return Ok(app.clone());
}
let cell_ids_to_cleanup = app.all_cells().collect::<Vec<_>>();
self.remove_cells(&cell_ids_to_cleanup).await;
let (_, app) = self
.update_state_prime(move |mut state| {
let app = state.get_app_mut(&app_id)?;
app.status = AppStatus::Disabled(reason);
let app = app.clone();
Ok((state, app))
})
.await?;
Ok(app)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn add_disabled_app_to_db(
&self,
app: InstalledAppCommon,
) -> ConductorResult<InstalledApp> {
let (_, disabled_app) = self
.update_state_prime(move |mut state| {
let disabled_app = state.add_app(app)?;
Ok((state, disabled_app))
})
.await?;
Ok(disabled_app)
}
pub(crate) fn p2p_config_overrides(manifest: &AppManifest) -> Option<CellConfigOverrides> {
let mut overrides = CellConfigOverrides::default();
match manifest {
AppManifest::V0(manifest) => {
overrides.bootstrap_url = manifest.bootstrap_url.clone();
overrides.signal_url = manifest.signal_url.clone();
}
}
if overrides.is_overriding() {
Some(overrides)
} else {
None
}
}
pub(crate) fn check_p2p_overrides_conflicting(
&self,
dna_hash: &DnaHash,
overrides: &CellConfigOverrides,
) -> Option<CellId> {
self.running_cells.share_ref(|c| {
c.values().find_map(|cell| {
if cell.id().dna_hash() == dna_hash && cell.overrides() != overrides {
Some(cell.id().clone())
} else {
None
}
})
})
}
}
}
mod state_impls {
use super::*;
impl Conductor {
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn get_state(&self) -> ConductorResult<ConductorState> {
self.spaces.get_state().await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn update_state<F>(&self, f: F) -> ConductorResult<ConductorState>
where
F: Send + FnOnce(ConductorState) -> ConductorResult<ConductorState> + 'static,
{
self.spaces.update_state(f).await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub(crate) async fn update_state_prime<F, O>(
&self,
f: F,
) -> ConductorResult<(ConductorState, O)>
where
F: FnOnce(ConductorState) -> ConductorResult<(ConductorState, O)> + Send + 'static,
O: Send + 'static,
{
self.check_running()?;
self.spaces.update_state_prime(f).await
}
}
}
mod scheduler_impls {
use super::*;
impl Conductor {
pub(super) fn set_scheduler(&self, join_handle: tokio::task::JoinHandle<()>) {
let mut scheduler = self.scheduler.lock();
if let Some(existing_join_handle) = &*scheduler {
existing_join_handle.abort();
}
*scheduler = Some(join_handle);
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self)))]
pub(crate) async fn start_scheduler(
self: Arc<Self>,
interval_period: std::time::Duration,
) -> StateMutationResult<()> {
let tasks = self
.spaces
.get_from_spaces(|space| {
let all_dbs = space.get_all_authored_dbs();
all_dbs.into_iter().map(|db| async move {
db.write_async(|txn| delete_all_ephemeral_scheduled_fns(txn))
.await
})
})
.into_iter()
.flatten();
futures::future::join_all(tasks).await;
let scheduler_handle = self.clone();
self.set_scheduler(tokio::task::spawn(async move {
let mut interval = tokio::time::interval(interval_period);
loop {
interval.tick().await;
scheduler_handle
.clone()
.dispatch_scheduled_fns(Timestamp::now())
.await;
}
}));
Ok(())
}
pub(crate) async fn dispatch_scheduled_fns(self: Arc<Self>, now: Timestamp) {
let cell_arcs = {
let mut cell_arcs = vec![];
for cell_id in self.running_cell_ids() {
if let Ok(cell_arc) = self.cell_by_id(&cell_id).await {
cell_arcs.push(cell_arc);
}
}
cell_arcs
};
let tasks = cell_arcs
.into_iter()
.map(|cell_arc| cell_arc.dispatch_scheduled_fns(now));
futures::future::join_all(tasks).await;
}
}
}
mod misc_impls {
use super::{state_dump_helpers::peer_store_dump, *};
use holochain_conductor_api::JsonDump;
use holochain_zome_types::{action::builder, Entry};
use kitsune2_api::{SpaceId, TransportStats};
use std::sync::atomic::Ordering;
impl Conductor {
pub async fn grant_zome_call_capability(
&self,
payload: GrantZomeCallCapabilityPayload,
) -> ConductorApiResult<ActionHash> {
let GrantZomeCallCapabilityPayload { cell_id, cap_grant } = payload;
let cell = self.cell_by_id(&cell_id).await?;
cell.check_or_run_zome_init().await?;
let source_chain = SourceChain::new(
self.get_or_create_authored_db(
cell_id.dna_hash(),
cell.id().agent_pubkey().clone(),
)?,
self.get_or_create_dht_db(cell_id.dna_hash())?,
self.keystore.clone(),
cell_id.agent_pubkey().clone(),
)
.await?;
let cap_grant_entry = Entry::CapGrant(cap_grant);
let entry_hash = EntryHash::with_data_sync(&cap_grant_entry);
let action_builder = builder::Create {
entry_type: EntryType::CapGrant,
entry_hash,
};
let action_hash = source_chain
.put_weightless(
action_builder,
Some(cap_grant_entry),
ChainTopOrdering::default(),
)
.await?;
source_chain
.flush(
cell.holochain_p2p_dna()
.target_arcs()
.await
.map_err(ConductorApiError::other)?,
)
.await?;
self.cell_by_id(&cell_id)
.await?
.notify_authored_ops_moved_to_limbo();
Ok(action_hash)
}
pub async fn revoke_zome_call_capability(
&self,
cell_id: CellId,
action_hash: ActionHash,
) -> ConductorApiResult<ActionHash> {
let cell = self.cell_by_id(&cell_id).await?;
cell.check_or_run_zome_init().await?;
let source_chain = SourceChain::new(
self.get_or_create_authored_db(
cell_id.dna_hash(),
cell.id().agent_pubkey().clone(),
)?,
self.get_or_create_dht_db(cell_id.dna_hash())?,
self.keystore.clone(),
cell_id.agent_pubkey().clone(),
)
.await?;
let grant_query = ChainQueryFilter::new()
.include_entries(true)
.entry_type(EntryType::CapGrant);
let cap_grant_entry = source_chain
.query(grant_query.clone())
.await?
.into_iter()
.find_map(|record| {
if record.action_hash() == &action_hash {
match record.entry {
RecordEntry::Present(entry) => Some(entry),
_ => None,
}
} else {
None
}
})
.ok_or_else(|| ConductorApiError::other("No cap grant found for action hash"))?;
let entry_hash = EntryHash::with_data_sync(&cap_grant_entry);
let action_builder = builder::Delete {
deletes_address: action_hash,
deletes_entry_address: entry_hash,
};
let action_hash = source_chain
.put_weightless(action_builder, None, ChainTopOrdering::default())
.await?;
source_chain
.flush(
cell.holochain_p2p_dna()
.target_arcs()
.await
.map_err(ConductorApiError::other)?,
)
.await?;
self.cell_by_id(&cell_id)
.await?
.notify_authored_ops_moved_to_limbo();
Ok(action_hash)
}
pub async fn capability_grant_info(
&self,
cell_set: &HashSet<CellId>,
include_revoked: bool,
) -> ConductorApiResult<AppCapGrantInfo> {
let mut grant_info: Vec<(CellId, Vec<CapGrantInfo>)> = Vec::new();
let grant_query = ChainQueryFilter::new()
.include_entries(true)
.entry_type(EntryType::CapGrant);
let delete_query: ChainQueryFilter = ChainQueryFilter::new()
.include_entries(true)
.action_type(ActionType::Delete);
for cell_id in cell_set.iter() {
let chain = SourceChainRead::new(
self.get_or_create_authored_db(
cell_id.dna_hash(),
cell_id.agent_pubkey().clone(),
)?
.into(),
self.get_or_create_dht_db(cell_id.dna_hash())?.into(),
self.keystore().clone(),
cell_id.agent_pubkey().clone(),
)
.await?;
let grant_list = chain.query(grant_query.clone()).await?;
if grant_list.is_empty() {
continue;
}
let delete_action_hash_map: HashMap<ActionHash, Timestamp> = chain
.query(delete_query.clone())
.await?
.iter()
.filter_map(|record| {
if let Action::Delete(delete) = record.action() {
Some((delete.deletes_address.clone(), delete.timestamp))
} else {
None
}
})
.collect::<HashMap<ActionHash, Timestamp>>();
tracing::info!("cap grant revocation list: {:?}", delete_action_hash_map);
let mut cap_grants: Vec<CapGrantInfo> = vec![];
for grant_record in grant_list {
let cap_action_hash = grant_record.action_hash().clone();
let mut revoke_time: Option<Timestamp> = None;
if !include_revoked {
continue;
} else if delete_action_hash_map.contains_key(&cap_action_hash) {
revoke_time = delete_action_hash_map
.get(&cap_action_hash)
.map(|time| time.to_owned())
}
let zome_cap_grant = match grant_record.entry.to_grant_option() {
Some(zome_cap_grant) => {
DesensitizedZomeCallCapGrant::from(zome_cap_grant.clone())
}
None => continue,
};
let zome_grant_info = CapGrantInfo {
cap_grant: zome_cap_grant,
action_hash: cap_action_hash,
created_at: grant_record.action().timestamp(),
revoked_at: revoke_time,
};
cap_grants.push(zome_grant_info);
}
grant_info.push((cell_id.clone(), cap_grants));
}
Ok(AppCapGrantInfo(grant_info))
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn dump_cell_state(&self, cell_id: &CellId) -> ConductorApiResult<String> {
let cell = self.cell_by_id(cell_id).await?;
let authored_db = cell.get_or_create_authored_db()?;
let dht_db = cell.dht_db();
let agent_pub_key = cell_id.agent_pubkey().clone();
let peer_dump = peer_store_dump(self, cell_id).await?;
let source_chain_dump =
source_chain::dump_state(authored_db.clone().into(), agent_pub_key).await?;
let out = JsonDump {
peer_dump,
source_chain_dump,
integration_dump: integration_dump(dht_db).await?,
};
let summary = out.to_string();
let out = (out, summary);
Ok(serde_json::to_string(&out)?)
}
pub async fn dump_conductor_state(&self) -> ConductorApiResult<String> {
#[derive(Serialize, Debug)]
pub struct ConductorSerialized {
running_cells: Vec<(DnaHashB64, AgentPubKeyB64)>,
shutting_down: bool,
admin_websocket_ports: Vec<u16>,
app_interfaces: Vec<AppInterfaceId>,
}
#[derive(Serialize, Debug)]
struct ConductorDump {
conductor: ConductorSerialized,
state: ConductorState,
}
let conductor_state = self.get_state().await?;
let conductor = ConductorSerialized {
running_cells: self.running_cells.share_ref(|c| {
c.clone()
.into_keys()
.map(|id| {
let (dna, agent) = id.into_dna_and_agent();
(dna.into(), agent.into())
})
.collect()
}),
shutting_down: self.shutting_down.load(Ordering::SeqCst),
admin_websocket_ports: self.admin_websocket_ports.share_ref(|p| p.clone()),
app_interfaces: conductor_state.app_interfaces.keys().cloned().collect(),
};
let dump = ConductorDump {
conductor,
state: conductor_state,
};
let out = serde_json::to_string(&dump)?;
Ok(out)
}
pub async fn dump_full_cell_state(
&self,
cell_id: &CellId,
dht_ops_cursor: Option<u64>,
) -> ConductorApiResult<FullStateDump> {
let authored_db =
self.get_or_create_authored_db(cell_id.dna_hash(), cell_id.agent_pubkey().clone())?;
let dht_db = self.get_or_create_dht_db(cell_id.dna_hash())?;
let source_chain_dump =
source_chain::dump_state(authored_db.into(), cell_id.agent_pubkey().clone())
.await?;
let peer_dump = peer_store_dump(self, cell_id).await?;
let out = FullStateDump {
peer_dump,
source_chain_dump,
integration_dump: full_integration_dump(&dht_db, dht_ops_cursor).await?,
};
Ok(out)
}
pub async fn dump_network_metrics(
&self,
request: Kitsune2NetworkMetricsRequest,
) -> ConductorApiResult<HashMap<DnaHash, Kitsune2NetworkMetrics>> {
Ok(self.holochain_p2p.dump_network_metrics(request).await?)
}
pub async fn dump_network_metrics_for_app(
&self,
installed_app_id: &InstalledAppId,
request: Kitsune2NetworkMetricsRequest,
) -> ConductorApiResult<HashMap<DnaHash, Kitsune2NetworkMetrics>> {
let all_dna_hashes = {
let state = self.get_state().await?;
let installed_app = state.get_app(installed_app_id)?;
installed_app
.role_assignments
.values()
.flat_map(|r| match r {
AppRoleAssignment::Primary(p) if p.is_provisioned => {
let mut hashes = vec![p.base_dna_hash.clone()];
hashes.extend(p.clones.values().cloned().collect::<Vec<_>>());
hashes
}
AppRoleAssignment::Primary(p) => {
p.clones.values().cloned().collect::<Vec<_>>()
}
AppRoleAssignment::Dependency(d) => vec![d.cell_id.dna_hash().clone()],
})
.collect::<Vec<_>>()
};
Ok(if let Some(ref dna_hash) = request.dna_hash {
if !all_dna_hashes.contains(dna_hash) {
return Err(ConductorApiError::Other("DNA hash not found in app".into()));
}
self.holochain_p2p.dump_network_metrics(request).await?
} else {
let mut out = HashMap::new();
for dna_hash in all_dna_hashes {
match self
.holochain_p2p
.dump_network_metrics(Kitsune2NetworkMetricsRequest {
dna_hash: Some(dna_hash.clone()),
..request.clone()
})
.await
{
Ok(metrics) => {
out.extend(metrics);
}
Err(e) => {
tracing::error!(
"Failed to get network metrics for dna_hash: {:?}, error: {:?}",
dna_hash,
e
);
}
}
}
out
})
}
pub async fn dump_network_stats(&self) -> ConductorApiResult<HolochainTransportStats> {
let transport_stats = self.holochain_p2p.dump_network_stats().await?;
Ok(HolochainTransportStats {
transport_stats: transport_stats.transport_stats,
blocked_message_counts: transport_stats
.blocked_message_counts
.into_iter()
.map(|(k, v)| {
(
k,
v.into_iter()
.map(|(space, count)| (DnaHash::from_k2_space(&space), count))
.collect(),
)
})
.collect(),
})
}
pub async fn dump_network_stats_for_app(
&self,
installed_app_id: &InstalledAppId,
) -> ConductorApiResult<HolochainTransportStats> {
let all_dna_hashes = {
let state = self.get_state().await?;
let installed_app = state.get_app(installed_app_id)?;
installed_app
.role_assignments
.values()
.flat_map(|r| match r {
AppRoleAssignment::Primary(p) if p.is_provisioned => {
let mut hashes = vec![p.base_dna_hash.clone()];
hashes.extend(p.clones.values().cloned().collect::<Vec<_>>());
hashes
}
AppRoleAssignment::Primary(p) => {
p.clones.values().cloned().collect::<Vec<_>>()
}
AppRoleAssignment::Dependency(d) => vec![d.cell_id.dna_hash().clone()],
})
.collect::<Vec<_>>()
};
let mut keep_peer_ids = HashSet::new();
for dna_hash in &all_dna_hashes {
let peer_store = self.holochain_p2p.peer_store(dna_hash.clone()).await?;
keep_peer_ids.extend(peer_store.get_all().await?.into_iter().filter_map(|p| {
p.url
.as_ref()
.and_then(|u| u.peer_id())
.map(|id| id.to_string())
}));
}
let stats = self.holochain_p2p.dump_network_stats().await?;
Ok(HolochainTransportStats {
transport_stats: TransportStats {
backend: stats.transport_stats.backend,
peer_urls: stats.transport_stats.peer_urls,
connections: stats
.transport_stats
.connections
.into_iter()
.filter(|s| keep_peer_ids.contains(&s.pub_key))
.collect(),
},
blocked_message_counts: stats
.blocked_message_counts
.into_iter()
.filter_map(|(url, space_counts)| {
if url.peer_id().is_some_and(|id| keep_peer_ids.contains(id)) {
let filtered: HashMap<DnaHash, kitsune2_api::MessageBlockCount> =
space_counts
.into_iter()
.filter_map(|(space, count)| {
let hash = DnaHash::from_k2_space(&space);
if all_dna_hashes.contains(&hash) {
Some((hash, count))
} else {
None
}
})
.collect();
if !filtered.is_empty() {
Some((url, filtered))
} else {
None
}
} else {
None
}
})
.collect(),
})
}
pub async fn add_agent_infos(&self, agent_infos: Vec<String>) -> ConductorApiResult<()> {
let mut parsed_by_space: HashMap<SpaceId, Vec<Arc<AgentInfoSigned>>> = HashMap::new();
for info in agent_infos {
let parsed_info = kitsune2_api::AgentInfoSigned::decode(
&kitsune2_core::Ed25519Verifier,
info.as_bytes(),
)?;
let space_id = parsed_info.space.clone();
parsed_by_space
.entry(space_id)
.or_default()
.push(parsed_info);
}
for (space_id, agent_infos) in parsed_by_space {
self.holochain_p2p
.peer_store(DnaHash::from_k2_space(&space_id))
.await
.map_err(|err| ConductorApiError::CellError(err.into()))?
.insert(agent_infos)
.await?;
}
Ok(())
}
pub async fn update_coordinators(
&self,
cell_id: CellId,
coordinator_zomes: CoordinatorZomes,
wasms: Vec<wasm::DnaWasm>,
) -> ConductorResult<()> {
let mut ribosome =
self.ribosome_store()
.share_ref(|d| match d.get_ribosome(&cell_id) {
Some(dna) => Ok(dna),
None => Err(ConductorError::CellMissing(cell_id.clone())),
})?;
let _old_wasms = ribosome
.dna_file
.update_coordinators(coordinator_zomes.clone(), wasms.clone())
.await?;
self.put_code_and_defs_in_databases(
cell_id.clone(),
ribosome.dna_def_hashed().clone(),
wasms.into_iter(),
Vec::with_capacity(0),
)
.await?;
self.ribosome_store()
.share_mut(|d| d.add_ribosome(cell_id, ribosome));
Ok(())
}
}
}
mod accessor_impls {
use super::*;
use tokio::sync::broadcast;
impl Conductor {
pub(crate) fn ribosome_store(&self) -> &RwShare<RibosomeStore> {
&self.ribosome_store
}
pub(crate) fn get_queue_consumer_workflows(&self) -> QueueConsumerMap {
self.spaces.queue_consumer_map.clone()
}
pub async fn get_signal_tx(
&self,
cell_id: &CellId,
) -> ConductorResult<broadcast::Sender<Signal>> {
let app = self
.find_app_containing_cell(cell_id)
.await?
.ok_or_else(|| ConductorError::CellMissing(cell_id.clone()))?;
Ok(self.app_broadcast.create_send_handle(app.id().clone()))
}
pub(crate) fn get_ribosome(&self, cell_id: &CellId) -> ConductorResult<RealRibosome> {
self.ribosome_store
.share_ref(|d| match d.get_ribosome(cell_id) {
Some(r) => Ok(r),
None => Err(ConductorError::CellMissing(cell_id.to_owned())),
})
}
pub(crate) fn get_any_ribosome_for_dna_hash(
&self,
dna_hash: &DnaHash,
) -> ConductorResult<RealRibosome> {
self.ribosome_store
.share_ref(|d| match d.get_any_ribosome_for_dna_hash(dna_hash) {
Some(r) => Ok(r),
None => Err(DnaError::DnaMissing(dna_hash.to_owned()).into()),
})
}
pub(crate) fn get_or_create_space(&self, dna_hash: &DnaHash) -> DatabaseResult<Space> {
self.spaces.get_or_create_space(dna_hash)
}
pub(crate) fn get_or_create_authored_db(
&self,
dna_hash: &DnaHash,
author: AgentPubKey,
) -> DatabaseResult<DbWrite<DbKindAuthored>> {
self.spaces.get_or_create_authored_db(dna_hash, author)
}
pub(crate) fn get_authored_db_if_present(
&self,
dna_hash: &DnaHash,
author: &AgentPubKey,
) -> DatabaseResult<Option<DbWrite<DbKindAuthored>>> {
match self.spaces.get_authored_db_if_present(dna_hash, author)? {
Some(db) => Ok(Some(db.clone())),
None => Ok(None),
}
}
pub(crate) fn get_or_create_dht_db(
&self,
dna_hash: &DnaHash,
) -> DatabaseResult<DbWrite<DbKindDht>> {
self.spaces.dht_db(dna_hash)
}
pub async fn post_commit_permit(
&self,
) -> Result<tokio::sync::mpsc::OwnedPermit<PostCommitArgs>, SendError<()>> {
self.post_commit.clone().reserve_owned().await
}
pub fn get_config(&self) -> &ConductorConfig {
&self.config
}
pub fn task_manager(&self) -> TaskManagerClient {
self.task_manager.clone()
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn find_app_containing_cell(
&self,
cell_id: &CellId,
) -> ConductorResult<Option<InstalledApp>> {
Ok(self
.get_state()
.await?
.find_app_containing_cell(cell_id)
.cloned())
}
}
}
mod authenticate_token_impls {
use super::*;
use holochain_conductor_api::{
AppAuthenticationToken, AppAuthenticationTokenIssued, IssueAppAuthenticationTokenPayload,
};
impl Conductor {
pub fn issue_app_authentication_token(
&self,
payload: IssueAppAuthenticationTokenPayload,
) -> ConductorResult<AppAuthenticationTokenIssued> {
let (token, expires_at) = self.app_auth_token_store.share_mut(|app_connection_auth| {
app_connection_auth.issue_token(
payload.installed_app_id,
payload.expiry_seconds,
payload.single_use,
)
});
Ok(AppAuthenticationTokenIssued {
token,
expires_at: expires_at
.and_then(|i| i.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| Timestamp::saturating_from_dur(&d)),
})
}
pub fn revoke_app_authentication_token(
&self,
token: AppAuthenticationToken,
) -> ConductorResult<()> {
self.app_auth_token_store
.share_mut(|app_connection_auth| app_connection_auth.revoke_token(token));
Ok(())
}
pub fn authenticate_app_token(
&self,
token: Vec<u8>,
app_id: Option<InstalledAppId>,
) -> ConductorResult<InstalledAppId> {
self.app_auth_token_store.share_mut(|app_connection_auth| {
app_connection_auth.authenticate_token(token, app_id)
})
}
}
}
#[cfg(feature = "unstable-countersigning")]
mod countersigning_impls {
use super::*;
use crate::core::workflow::{self, countersigning_workflow::CountersigningWorkspace};
impl Conductor {
pub(crate) async fn accept_countersigning_session(
&self,
cell_id: CellId,
request: PreflightRequest,
) -> ConductorResult<PreflightRequestAcceptance> {
let countersigning_trigger = self.cell_by_id(&cell_id).await?.countersigning_trigger();
Ok(
workflow::countersigning_workflow::accept_countersigning_request(
self.spaces.get_or_create_space(cell_id.dna_hash())?,
self.keystore.clone(),
cell_id.agent_pubkey().clone(),
request,
countersigning_trigger,
)
.await?,
)
}
pub async fn get_countersigning_session_state(
&self,
cell_id: &CellId,
) -> ConductorResult<Option<CountersigningSessionState>> {
let space = self.get_or_create_space(cell_id.dna_hash())?;
let maybe_countersigning_workspace =
space.countersigning_workspaces.lock().get(cell_id).cloned();
match maybe_countersigning_workspace {
None => Err(ConductorError::CountersigningError(
CountersigningError::WorkspaceDoesNotExist(cell_id.clone()),
)),
Some(workspace) => Ok(workspace.get_countersigning_session_state()),
}
}
pub async fn abandon_countersigning_session(
&self,
cell_id: &CellId,
) -> ConductorResult<()> {
let space = self.get_or_create_space(cell_id.dna_hash())?;
let countersigning_workspace = self
.get_workspace_of_unresolved_session(&space, cell_id)
.await?;
let cell = self.cell_by_id(cell_id).await?;
countersigning_workspace.mark_countersigning_session_for_force_abandon(cell_id)?;
cell.countersigning_trigger()
.trigger(&"force_abandon_session");
Ok(())
}
pub async fn publish_countersigning_session(
&self,
cell_id: &CellId,
) -> ConductorResult<()> {
let space = self.get_or_create_space(cell_id.dna_hash())?;
let countersigning_workspace = self
.get_workspace_of_unresolved_session(&space, cell_id)
.await?;
let cell = self.cell_by_id(cell_id).await?;
countersigning_workspace.mark_countersigning_session_for_force_publish(cell_id)?;
cell.countersigning_trigger()
.trigger(&"force_publish_session");
Ok(())
}
async fn get_workspace_of_unresolved_session(
&self,
space: &Space,
cell_id: &CellId,
) -> ConductorResult<Arc<CountersigningWorkspace>> {
let maybe_countersigning_workspace =
space.countersigning_workspaces.lock().get(cell_id).cloned();
match maybe_countersigning_workspace {
None => Err(ConductorError::CountersigningError(
CountersigningError::WorkspaceDoesNotExist(cell_id.clone()),
)),
Some(countersigning_workspace) => {
match countersigning_workspace.get_countersigning_session_state() {
None => Err(ConductorError::CountersigningError(
CountersigningError::SessionNotFound(cell_id.clone()),
)),
Some(CountersigningSessionState::Unknown { resolution, .. }) => {
if resolution.attempts >= 1 {
Ok(countersigning_workspace)
} else {
Err(ConductorError::CountersigningError(
CountersigningError::SessionNotUnresolved(cell_id.clone()),
))
}
}
_ => Err(ConductorError::CountersigningError(
CountersigningError::SessionNotUnresolved(cell_id.clone()),
)),
}
}
}
}
}
}
impl Conductor {
fn add_admin_port(&self, port: u16) {
self.admin_websocket_ports.share_mut(|p| p.push(port));
}
async fn delete_or_purge_database<Kind: DbKindT + Send + Sync + 'static>(
&self,
db: DbWrite<Kind>,
) -> ConductorResult<()> {
let mut path = db.path().clone();
if let Err(err) = ffs::remove_file(&path).await {
tracing::warn!(?err, "Could not remove primary DB file, probably because it is still in use. Purging all data instead.");
db.write_async(|txn| purge_data(txn)).await?;
} else {
tracing::info!("Deleted primary DB file {}", path.display());
}
path.set_extension("");
let stem = path.to_string_lossy();
for ext in ["shm", "wal"] {
let path = PathBuf::from(format!("{stem}-{ext}"));
if let Err(err) = ffs::remove_file(&path).await {
let err = err.remove_backtrace();
tracing::warn!(?err, "Failed to remove DB support file");
} else {
tracing::info!("Deleted file {}", path.display());
}
}
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
async fn delete_cell_databases(
&self,
app_id: &InstalledAppId,
cell_ids: Vec<CellId>,
) -> ConductorResult<()> {
for cell_id in cell_ids.clone() {
let authored_db = self
.spaces
.get_or_create_authored_db(cell_id.dna_hash(), cell_id.agent_pubkey().clone())?;
self.delete_or_purge_database(authored_db).await?;
}
let remaining_dnas = self
.get_state()
.await?
.installed_apps()
.iter()
.filter(|(id, _)| *id != app_id)
.flat_map(|(_, app)| app.all_cells().map(|cell_id| cell_id.dna_hash().clone()))
.collect::<Vec<_>>();
let dnas_to_purge = cell_ids
.iter()
.map(|cell_id| cell_id.dna_hash())
.filter(|dna| !remaining_dnas.contains(dna))
.collect::<Vec<_>>();
if !dnas_to_purge.is_empty() {
tracing::info!(?dnas_to_purge, "Purging DNAs");
}
for dna_hash in dnas_to_purge {
let dht_db = self.spaces.dht_db(dna_hash)?;
let cache_db = self.spaces.cache(dna_hash)?;
futures::future::join_all(
[
dht_db.write_async(|txn| purge_data(txn)).boxed(),
cache_db.write_async(|txn| purge_data(txn)).boxed(),
]
.into_iter(),
)
.await
.into_iter()
.collect::<Result<Vec<()>, _>>()?;
self.delete_or_purge_database(dht_db).await?;
self.delete_or_purge_database(cache_db).await?;
}
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
async fn remove_app_from_db(&self, app_id: &InstalledAppId) -> ConductorResult<InstalledApp> {
let (_state, app) = self
.update_state_prime({
let app_id = app_id.clone();
move |mut state| {
let app = state.remove_app(&app_id)?;
Ok((state, app))
}
})
.await?;
Ok(app)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
async fn add_clone_cell_to_app(
&self,
app_id: InstalledAppId,
agent_key: AgentPubKey,
role_name: RoleName,
dna_modifiers: DnaModifiersOpt,
name: Option<String>,
) -> ConductorResult<ClonedCell> {
let ribosome_store = &self.ribosome_store;
let (_, base_cell_dna_hash) = self
.update_state_prime({
let app_id = app_id.clone();
let role_name = role_name.clone();
move |mut state| {
let app = state.get_app_mut(&app_id)?;
let app_role = app.primary_role(&role_name)?;
if app_role.is_clone_limit_reached() {
return Err(ConductorError::AppError(AppError::CloneLimitExceeded(
app_role.clone_limit(),
Box::new(app_role.clone()),
)));
}
let original_dna_hash = app_role.dna_hash().clone();
Ok((state, original_dna_hash))
}
})
.await?;
let original_dna_hash = base_cell_dna_hash.clone();
let clone_dna = ribosome_store.share_ref(|rs| {
let base_cell_id = CellId::new(base_cell_dna_hash, agent_key.clone());
let mut dna_file = rs
.get_dna_file(&base_cell_id)
.ok_or(ConductorError::CellMissing(base_cell_id))?
.update_modifiers(dna_modifiers);
if let Some(name) = name {
dna_file = dna_file.set_name(name);
}
Ok::<_, ConductorError>(dna_file)
})?;
let name = clone_dna.dna_def().name.clone();
let dna_modifiers = clone_dna.dna_def().modifiers.clone();
let clone_dna_hash = clone_dna.dna_hash().to_owned();
let clone_cell_id = CellId::new(clone_dna_hash, agent_key);
let clone_cell_id_move = clone_cell_id.clone();
let (_, installed_clone_cell) = self
.update_state_prime(move |mut state| {
let state_copy = state.clone();
let app = state.get_app_mut(&app_id)?;
if state_copy
.installed_apps()
.iter()
.flat_map(|(_, app)| app.all_cells())
.any(|cell_id| cell_id == clone_cell_id_move)
{
return Err(ConductorError::AppError(AppError::DuplicateCellId(
clone_cell_id_move,
)));
}
let clone_id = app.add_clone(&role_name, clone_cell_id_move.dna_hash())?;
let installed_clone_cell = ClonedCell {
cell_id: clone_cell_id_move,
clone_id,
original_dna_hash,
dna_modifiers,
name,
enabled: true,
};
Ok((state, installed_clone_cell))
})
.await?;
self.register_dna_file(clone_cell_id, clone_dna).await?;
Ok(installed_clone_cell)
}
fn print_setup(&self) {
use std::fmt::Write;
let mut out = String::new();
self.admin_websocket_ports
.share_ref(|admin_websocket_ports| {
for port in admin_websocket_ports {
writeln!(&mut out, "###ADMIN_PORT:{port}###")
.expect("Can't write setup to std out");
}
});
println!("\n###HOLOCHAIN_SETUP###\n{out}###HOLOCHAIN_SETUP_END###");
}
}
#[cfg(any(test, feature = "test_utils"))]
#[allow(missing_docs)]
mod test_utils_impls {
use super::*;
use tokio::sync::broadcast;
impl Conductor {
pub async fn get_state_from_handle(&self) -> ConductorResult<ConductorState> {
self.get_state().await
}
pub fn subscribe_to_app_signals(
&self,
installed_app_id: InstalledAppId,
) -> broadcast::Receiver<Signal> {
self.app_broadcast.subscribe(installed_app_id)
}
pub fn get_dht_db(&self, dna_hash: &DnaHash) -> ConductorApiResult<DbWrite<DbKindDht>> {
Ok(self.get_or_create_dht_db(dna_hash)?)
}
pub async fn get_cache_db(
&self,
cell_id: &CellId,
) -> ConductorApiResult<DbWrite<DbKindCache>> {
let cell = self.cell_by_id(cell_id).await?;
Ok(cell.cache().clone())
}
pub fn get_spaces(&self) -> Spaces {
self.spaces.clone()
}
pub async fn get_cell_triggers(
&self,
cell_id: &CellId,
) -> ConductorApiResult<QueueTriggers> {
let cell = self.cell_by_id(cell_id).await?;
Ok(cell.triggers().clone())
}
}
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
fn purge_data(txn: &mut Transaction) -> DatabaseResult<()> {
txn.execute("DELETE FROM DhtOp", ())?;
txn.execute("DELETE FROM Action", ())?;
txn.execute("DELETE FROM Entry", ())?;
txn.execute("DELETE FROM ValidationReceipt", ())?;
txn.execute("DELETE FROM ChainLock", ())?;
txn.execute("DELETE FROM ScheduledFunctions", ())?;
Ok(())
}
pub(crate) async fn genesis_cells(
conductor: ConductorHandle,
cell_ids_with_proofs: Vec<(CellId, Option<MembraneProof>)>,
) -> ConductorResult<()> {
let cells_tasks = cell_ids_with_proofs.into_iter().map(|(cell_id, proof)| {
let conductor = conductor.clone();
let cell_id_inner = cell_id.clone();
tokio::spawn(async move {
let space = conductor
.get_or_create_space(cell_id_inner.dna_hash())
.map_err(|e| CellError::FailedToCreateDnaSpace(ConductorError::from(e).into()))?;
let authored_db =
space.get_or_create_authored_db(cell_id_inner.agent_pubkey().clone())?;
let dht_db = space.dht_db;
let ribosome = conductor.get_ribosome(&cell_id_inner).map_err(Box::new)?;
Cell::genesis(
cell_id_inner.clone(),
conductor,
authored_db,
dht_db,
ribosome,
proof,
)
.await
})
.map_err(CellError::from)
.map(|genesis_result| (cell_id, genesis_result.and_then(|r| r)))
});
let (_success, errors): (Vec<CellId>, Vec<(CellId, CellError)>) =
futures::future::join_all(cells_tasks)
.await
.into_iter()
.partition_map(|(cell_id, r)| match r {
Ok(()) => either::Either::Left(cell_id),
Err(err) => either::Either::Right((cell_id, err)),
});
if !errors.is_empty() {
Err(ConductorError::GenesisFailed { errors })
} else {
Ok(())
}
}
pub fn app_manifest_from_dnas(
dnas_with_roles: &[impl DnaWithRole],
clone_limit: u32,
memproofs_deferred: bool,
network_seed: Option<String>,
) -> AppManifest {
let roles: Vec<_> = dnas_with_roles
.iter()
.map(|dr| {
let dna = dr.dna();
let mut modifiers = DnaModifiersOpt::none();
modifiers.network_seed.clone_from(&network_seed);
AppRoleManifest {
name: dr.role(),
dna: AppRoleDnaManifest {
path: Some(format!("{}", dna.dna_hash())),
modifiers,
installed_hash: Some(dr.dna().dna_hash().clone().into()),
clone_limit,
},
provisioning: Some(CellProvisioning::Create { deferred: false }),
}
})
.collect();
AppManifestCurrentBuilder::default()
.name("[generated]".into())
.description(None)
.roles(roles)
.allow_deferred_memproofs(memproofs_deferred)
.build()
.unwrap()
.into()
}
pub async fn integration_dump<Db: ReadAccess<DbKindDht>>(
vault: &Db,
) -> ConductorApiResult<IntegrationStateDump> {
vault
.read_async(move |txn| {
let integrated = txn.query_row(
"SELECT count(hash) FROM DhtOp WHERE when_integrated IS NOT NULL",
[],
|row| row.get(0),
)?;
let integration_limbo = txn.query_row(
"SELECT count(hash) FROM DhtOp WHERE when_integrated IS NULL AND validation_stage = 3",
[],
|row| row.get(0),
)?;
let validation_limbo = txn.query_row(
"
SELECT count(hash) FROM DhtOp
WHERE when_integrated IS NULL
AND
(validation_stage IS NULL OR validation_stage < 3)
",
[],
|row| row.get(0),
)?;
ConductorApiResult::Ok(IntegrationStateDump {
validation_limbo,
integration_limbo,
integrated,
})
})
.await
}
pub async fn full_integration_dump(
vault: &DbRead<DbKindDht>,
dht_ops_cursor: Option<u64>,
) -> ConductorApiResult<FullIntegrationStateDump> {
vault
.read_async(move |txn| {
let integrated =
query_dht_ops_from_statement(txn, state_dump::DHT_OPS_INTEGRATED, dht_ops_cursor)?;
let validation_limbo = query_dht_ops_from_statement(
txn,
state_dump::DHT_OPS_IN_VALIDATION_LIMBO,
dht_ops_cursor,
)?;
let integration_limbo = query_dht_ops_from_statement(
txn,
state_dump::DHT_OPS_IN_INTEGRATION_LIMBO,
dht_ops_cursor,
)?;
let dht_ops_cursor = txn
.query_row(state_dump::DHT_OPS_ROW_ID, [], |row| row.get(0))
.unwrap_or(0);
ConductorApiResult::Ok(FullIntegrationStateDump {
validation_limbo,
integration_limbo,
integrated,
dht_ops_cursor,
})
})
.await
}
fn query_dht_ops_from_statement(
txn: &Transaction,
stmt_str: &str,
dht_ops_cursor: Option<u64>,
) -> ConductorApiResult<Vec<DhtOp>> {
let final_stmt_str = match dht_ops_cursor {
Some(cursor) => format!("{stmt_str} AND DhtOp.rowid > {cursor}"),
None => stmt_str.into(),
};
let mut stmt = txn.prepare(final_stmt_str.as_str())?;
let r: Vec<DhtOp> = stmt
.query_and_then([], |row| {
holochain_state::query::map_sql_dht_op(false, "dht_type", row)
})?
.collect::<StateQueryResult<Vec<_>>>()?;
Ok(r)
}
fn get_modifiers_map_from_role_settings(roles_settings: &Option<RoleSettingsMap>) -> ModifiersMap {
match roles_settings {
Some(role_settings_map) => role_settings_map
.iter()
.filter_map(|(role_name, role_settings)| match role_settings {
#[allow(deprecated)]
RoleSettings::UseExisting { .. } => None,
RoleSettings::Provisioned { modifiers, .. } => {
modifiers.as_ref().map(|m| (role_name.clone(), m.clone()))
}
})
.collect(),
None => HashMap::new(),
}
}
fn get_memproof_map_from_role_settings(role_settings: &Option<RoleSettingsMap>) -> MemproofMap {
match role_settings {
Some(role_settings_map) => role_settings_map
.iter()
.filter_map(|(role_name, role_settings)| match role_settings {
#[allow(deprecated)]
RoleSettings::UseExisting { .. } => None,
RoleSettings::Provisioned { membrane_proof, .. } => membrane_proof
.as_ref()
.map(|m| (role_name.clone(), m.clone())),
})
.collect(),
None => HashMap::new(),
}
}
fn get_existing_cells_map_from_role_settings(
roles_settings: &Option<RoleSettingsMap>,
) -> ExistingCellsMap {
match roles_settings {
Some(role_settings_map) => role_settings_map
.iter()
.filter_map(|(role_name, role_settings)| match role_settings {
#[allow(deprecated)]
RoleSettings::UseExisting { cell_id } => Some((role_name.clone(), cell_id.clone())),
_ => None,
})
.collect(),
None => HashMap::new(),
}
}