use super::{conductor::RwShare, error::ConductorResult};
use crate::conductor::{error::ConductorError, state::ConductorState};
use crate::core::workflow::countersigning_workflow::CountersigningWorkspace;
use crate::core::{
queue_consumer::QueueConsumerMap,
workflow::{
incoming_dht_ops_workflow::{
incoming_dht_ops_workflow, IncomingOpHashes, IncomingOpsBatch,
},
witnessing_workflow::{receive_incoming_countersigning_ops, WitnessingWorkspace},
},
};
use holo_hash::{AgentPubKey, DhtOpHash, DnaHash};
use holochain_conductor_api::conductor::paths::DatabasesRootPath;
use holochain_conductor_api::conductor::ConductorConfig;
use holochain_keystore::MetaLairClient;
use holochain_p2p::actor::DynHcP2p;
use holochain_sqlite::prelude::{
DatabaseResult, DbKey, DbKindAuthored, DbKindCache, DbKindConductor, DbKindDht, DbSyncLevel,
DbSyncStrategy, DbWrite, PoolConfig, ReadAccess,
};
use holochain_state::{host_fn_workspace::SourceChainWorkspace, mutations, prelude::*};
use holochain_util::timed;
use lair_keystore_api::prelude::SharedLockedArray;
use rusqlite::OptionalExtension;
use std::convert::TryInto;
use std::path::PathBuf;
use std::{
cell::Cell,
collections::{hash_map, HashMap},
sync::Arc,
};
#[derive(Clone)]
pub struct Spaces {
map: RwShare<HashMap<DnaHash, Space>>,
pub(crate) db_dir: Arc<DatabasesRootPath>,
pub(crate) config: Arc<ConductorConfig>,
pub(crate) queue_consumer_map: QueueConsumerMap,
pub(crate) conductor_db: DbWrite<DbKindConductor>,
pub(crate) wasm_store: holochain_state::wasm::WasmStore,
pub(crate) dna_def_store: holochain_state::dna_def::DnaDefStore,
pub(crate) entry_def_store: holochain_state::entry_def::EntryDefStore,
db_key: DbKey,
}
#[derive(Clone)]
pub struct Space {
pub dna_hash: Arc<DnaHash>,
pub cache_db: DbWrite<DbKindCache>,
pub conductor_db: DbWrite<DbKindConductor>,
pub authored_dbs: Arc<parking_lot::Mutex<HashMap<AgentPubKey, DbWrite<DbKindAuthored>>>>,
pub dht_db: DbWrite<DbKindDht>,
pub peer_meta_store_db: DbWrite<DbKindPeerMetaStore>,
pub countersigning_workspaces:
Arc<parking_lot::Mutex<HashMap<CellId, Arc<CountersigningWorkspace>>>>,
pub witnessing_workspace: WitnessingWorkspace,
pub incoming_op_hashes: IncomingOpHashes,
pub incoming_ops_batch: IncomingOpsBatch,
root_db_dir: Arc<PathBuf>,
db_key: DbKey,
db_max_readers: u16,
}
#[cfg(test)]
pub struct TestSpaces {
pub spaces: Spaces,
pub test_spaces: HashMap<DnaHash, TestSpace>,
pub queue_consumer_map: QueueConsumerMap,
}
#[cfg(test)]
pub struct TestSpace {
pub space: Space,
_temp_dir: tempfile::TempDir,
}
thread_local!(static DANGER_PRINT_DB_SECRETS: Cell<bool> = const { Cell::new(false) });
pub fn set_danger_print_db_secrets(v: bool) {
DANGER_PRINT_DB_SECRETS.set(v);
}
impl Spaces {
pub async fn new(
config: Arc<ConductorConfig>,
passphrase: SharedLockedArray,
) -> ConductorResult<Self> {
let danger_print_db_secrets = DANGER_PRINT_DB_SECRETS.get();
DANGER_PRINT_DB_SECRETS.set(false);
let root_db_dir: DatabasesRootPath = config
.data_root_path
.clone()
.ok_or(ConductorError::NoDataRootPath)?
.try_into()?;
let db_key_path = root_db_dir.join("db.key");
let db_key = match tokio::fs::read_to_string(db_key_path.clone()).await {
Ok(locked) => DbKey::load(locked, passphrase.clone()).await?,
Err(_) => {
let db_key = DbKey::generate(passphrase.clone()).await?;
tokio::fs::write(db_key_path, db_key.locked.clone()).await?;
db_key
}
};
if danger_print_db_secrets {
eprintln!(
"--beg-db-secrets--{}--end-db-secrets--",
&String::from_utf8_lossy(&*db_key.unlocked.lock().unwrap().lock())
);
}
let db_sync_strategy = config.db_sync_strategy;
let db_sync_level = match db_sync_strategy {
DbSyncStrategy::Fast => DbSyncLevel::Off,
DbSyncStrategy::Resilient => DbSyncLevel::Normal,
};
let conductor_db = tokio::task::block_in_place(|| {
let conductor_db = DbWrite::open_with_pool_config(
root_db_dir.as_ref(),
DbKindConductor,
PoolConfig {
synchronous_level: db_sync_level,
key: db_key.clone(),
max_readers: config.db_max_readers,
},
)?;
ConductorResult::Ok(conductor_db)
})?;
let db_sync = match db_sync_level {
DbSyncLevel::Off => holochain_data::DbSyncLevel::Off,
DbSyncLevel::Normal => holochain_data::DbSyncLevel::Normal,
DbSyncLevel::Full => holochain_data::DbSyncLevel::Full,
};
let data_db_key = holochain_data::DbKey::load(db_key.locked.clone(), passphrase.clone())
.await
.map_err(ConductorError::other)?;
let wasm_db = holochain_data::open_db(
root_db_dir.as_ref(),
holochain_data::kind::Wasm,
holochain_data::HolochainDataConfig {
key: Some(data_db_key),
sync_level: db_sync,
max_readers: config.db_max_readers,
},
)
.await
.map_err(|e| std::io::Error::other(e.to_string()))?;
let wasm_store = holochain_state::wasm::WasmStore::new(wasm_db.clone());
let dna_def_store = holochain_state::dna_def::DnaDefStore::new(wasm_db.clone());
let entry_def_store = holochain_state::entry_def::EntryDefStore::new(wasm_db);
Ok(Spaces {
map: RwShare::new(HashMap::new()),
db_dir: Arc::new(root_db_dir),
config,
queue_consumer_map: QueueConsumerMap::new(),
conductor_db,
wasm_store,
dna_def_store,
entry_def_store,
db_key,
})
}
pub async fn unblock(&self, input: Block) -> DatabaseResult<()> {
holochain_state::block::unblock(&self.conductor_db, input).await
}
pub async fn is_blocked(
&self,
target_id: BlockTargetId,
timestamp: Timestamp,
_holochain_p2p: DynHcP2p,
) -> ConductorResult<bool> {
let cell_ids = match &target_id {
BlockTargetId::Cell(cell_id) => vec![cell_id.to_owned()],
BlockTargetId::Ip(_) => {
vec![]
}
};
if cell_ids.is_empty() {
return Ok(false);
}
self.conductor_db
.read_async(move |txn| {
Ok(
holochain_state::block::query_is_blocked(txn, target_id, timestamp)?
|| {
let mut all_blocked_cell_ids = true;
for cell_id in cell_ids {
if !holochain_state::block::query_is_blocked(
txn,
BlockTargetId::Cell(cell_id), timestamp)? {
all_blocked_cell_ids = false;
break;
}
}
all_blocked_cell_ids
},
)
})
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn get_state(&self) -> ConductorResult<ConductorState> {
timed!([1, 10, 1000], "get_state", {
match query_conductor_state(&self.conductor_db).await? {
Some(state) => Ok(state),
None => self.update_state(Ok).await,
}
})
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn update_state<F>(&self, f: F) -> ConductorResult<ConductorState>
where
F: Send + FnOnce(ConductorState) -> ConductorResult<ConductorState> + 'static,
{
let (state, _) = self.update_state_prime(|s| Ok((f(s)?, ()))).await?;
Ok(state)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn update_state_prime<F, O>(&self, f: F) -> ConductorResult<(ConductorState, O)>
where
F: FnOnce(ConductorState) -> ConductorResult<(ConductorState, O)> + Send + 'static,
O: Send + 'static,
{
timed!([1, 10, 1000], "update_state_prime", {
self.conductor_db
.write_async(move |txn| {
let state = txn
.query_row("SELECT blob FROM ConductorState WHERE id = 1", [], |row| {
row.get("blob")
})
.optional()?;
let state = match state {
Some(state) => from_blob(state)?,
None => ConductorState::default(),
};
let (new_state, output) = f(state)?;
mutations::insert_conductor_state(txn, (&new_state).try_into()?)?;
Result::<_, ConductorError>::Ok((new_state, output))
})
.await
})
}
pub fn get_from_spaces<R, F: FnMut(&Space) -> R>(&self, f: F) -> Vec<R> {
self.map
.share_ref(|spaces| spaces.values().map(f).collect())
}
pub fn get_or_create_space(&self, dna_hash: &DnaHash) -> DatabaseResult<Space> {
self.get_or_create_space_ref(dna_hash, |s| s.clone())
}
fn get_or_create_space_ref<F, R>(&self, dna_hash: &DnaHash, f: F) -> DatabaseResult<R>
where
F: Fn(&Space) -> R,
{
match self.map.share_ref(|spaces| spaces.get(dna_hash).map(&f)) {
Some(r) => Ok(r),
None => self
.map
.share_mut(|spaces| match spaces.entry(dna_hash.clone()) {
hash_map::Entry::Occupied(entry) => Ok(f(entry.get())),
hash_map::Entry::Vacant(entry) => {
let space = Space::new(
Arc::new(dna_hash.clone()),
self.db_dir.to_path_buf(),
self.config.db_sync_strategy,
self.db_key.clone(),
self.config.db_max_readers,
)?;
let r = f(&space);
entry.insert(space);
Ok(r)
}
}),
}
}
pub fn cache(&self, dna_hash: &DnaHash) -> DatabaseResult<DbWrite<DbKindCache>> {
self.get_or_create_space_ref(dna_hash, |space| space.cache_db.clone())
}
pub fn get_or_create_authored_db(
&self,
dna_hash: &DnaHash,
author: AgentPubKey,
) -> DatabaseResult<DbWrite<DbKindAuthored>> {
self.get_or_create_space_ref(dna_hash, |space| {
space.get_or_create_authored_db(author.clone())
})?
}
pub fn get_all_authored_dbs(
&self,
dna_hash: &DnaHash,
) -> DatabaseResult<Vec<DbWrite<DbKindAuthored>>> {
self.get_or_create_space_ref(dna_hash, |space| space.get_all_authored_dbs())
}
pub fn get_authored_db_if_present(
&self,
dna_hash: &DnaHash,
author: &AgentPubKey,
) -> DatabaseResult<Option<DbWrite<DbKindAuthored>>> {
match self.map.share_ref(|spaces| spaces.get(dna_hash).cloned()) {
Some(space) => space.get_authored_db_if_present(author),
None => Ok(None),
}
}
pub fn dht_db(&self, dna_hash: &DnaHash) -> DatabaseResult<DbWrite<DbKindDht>> {
self.get_or_create_space_ref(dna_hash, |space| space.dht_db.clone())
}
pub fn peer_meta_store_db(
&self,
dna_hash: &DnaHash,
) -> DatabaseResult<DbWrite<DbKindPeerMetaStore>> {
self.get_or_create_space_ref(dna_hash, |space| space.peer_meta_store_db.clone())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ops)))]
pub async fn handle_publish(&self, dna_hash: &DnaHash, ops: Vec<DhtOp>) -> ConductorResult<()> {
let space = self.get_or_create_space(dna_hash)?;
let trigger = match self
.queue_consumer_map
.sys_validation_trigger(space.dna_hash.clone())
{
Some(t) => t,
None => {
tracing::warn!("No sys validation trigger yet for space: {}", dna_hash);
return Ok(());
}
};
incoming_dht_ops_workflow(space, trigger, ops).await?;
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ops)))]
pub async fn handle_publish_countersign(
&self,
dna_hash: &DnaHash,
op: ChainOp,
) -> ConductorResult<()> {
let hash = DhtOpHash::with_data_sync(&op);
let (workspace, trigger) = self.get_or_create_space_ref(dna_hash, |space| {
(
space.witnessing_workspace.clone(),
self.queue_consumer_map
.witnessing_trigger(space.dna_hash.clone()),
)
})?;
let trigger = match trigger {
Some(t) => t,
None => {
tracing::warn!("No witnessing trigger yet for space: {}", dna_hash);
return Ok(());
}
};
receive_incoming_countersigning_ops(vec![(hash, op)], &workspace, trigger)?;
Ok(())
}
}
impl Space {
fn new(
dna_hash: Arc<DnaHash>,
root_db_dir: PathBuf,
db_sync_strategy: DbSyncStrategy,
db_key: DbKey,
db_max_readers: u16,
) -> DatabaseResult<Self> {
let db_sync_level = match db_sync_strategy {
DbSyncStrategy::Fast => DbSyncLevel::Off,
DbSyncStrategy::Resilient => DbSyncLevel::Normal,
};
let (cache, dht_db, peer_meta_store_db, conductor_db) =
tokio::task::block_in_place(|| {
let cache = DbWrite::open_with_pool_config(
root_db_dir.as_ref(),
DbKindCache(dna_hash.clone()),
PoolConfig {
synchronous_level: db_sync_level,
key: db_key.clone(),
max_readers: db_max_readers,
},
)?;
let dht_db = DbWrite::open_with_pool_config(
root_db_dir.as_ref(),
DbKindDht(dna_hash.clone()),
PoolConfig {
synchronous_level: db_sync_level,
key: db_key.clone(),
max_readers: db_max_readers,
},
)?;
let peer_meta_store_db = DbWrite::open_with_pool_config(
root_db_dir.as_ref(),
DbKindPeerMetaStore(dna_hash.clone()),
PoolConfig {
synchronous_level: db_sync_level,
key: db_key.clone(),
max_readers: db_max_readers,
},
)?;
let conductor_db: DbWrite<DbKindConductor> = DbWrite::open_with_pool_config(
root_db_dir.as_ref(),
DbKindConductor,
PoolConfig {
synchronous_level: db_sync_level,
key: db_key.clone(),
max_readers: db_max_readers,
},
)?;
DatabaseResult::Ok((cache, dht_db, peer_meta_store_db, conductor_db))
})?;
let witnessing_workspace = WitnessingWorkspace::default();
let incoming_op_hashes = IncomingOpHashes::default();
let incoming_ops_batch = IncomingOpsBatch::default();
let r = Self {
dna_hash,
cache_db: cache,
authored_dbs: Arc::new(parking_lot::Mutex::new(HashMap::new())),
dht_db,
peer_meta_store_db,
countersigning_workspaces: Default::default(),
witnessing_workspace,
incoming_op_hashes,
incoming_ops_batch,
conductor_db,
root_db_dir: Arc::new(root_db_dir),
db_key,
db_max_readers,
};
Ok(r)
}
pub async fn source_chain(
&self,
keystore: MetaLairClient,
author: AgentPubKey,
) -> SourceChainResult<SourceChain> {
SourceChain::raw_empty(
self.get_or_create_authored_db(author.clone())?,
self.dht_db.clone(),
keystore,
author,
)
.await
}
pub async fn source_chain_workspace(
&self,
keystore: MetaLairClient,
author: AgentPubKey,
) -> ConductorResult<SourceChainWorkspace> {
Ok(SourceChainWorkspace::new(
self.get_or_create_authored_db(author.clone())?.clone(),
self.dht_db.clone(),
self.cache_db.clone(),
keystore,
author,
)
.await?)
}
pub fn get_or_create_authored_db(
&self,
author: AgentPubKey,
) -> DatabaseResult<DbWrite<DbKindAuthored>> {
match self.authored_dbs.lock().entry(author.clone()) {
hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
hash_map::Entry::Vacant(entry) => {
let db = tokio::task::block_in_place(|| {
DbWrite::open_with_pool_config(
self.root_db_dir.as_ref(),
DbKindAuthored(Arc::new(CellId::new((*self.dna_hash).clone(), author))),
PoolConfig {
synchronous_level: DbSyncLevel::Normal,
key: self.db_key.clone(),
max_readers: self.db_max_readers,
},
)
})?;
entry.insert(db.clone());
Ok(db)
}
}
}
pub fn get_authored_db_if_present(
&self,
author: &AgentPubKey,
) -> DatabaseResult<Option<DbWrite<DbKindAuthored>>> {
Ok(self.authored_dbs.lock().get(author).cloned())
}
pub fn get_all_authored_dbs(&self) -> Vec<DbWrite<DbKindAuthored>> {
self.authored_dbs.lock().values().cloned().collect()
}
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
pub async fn query_conductor_state(
db: &DbRead<DbKindConductor>,
) -> ConductorResult<Option<ConductorState>> {
db.read_async(|txn| {
let state = txn
.query_row("SELECT blob FROM ConductorState WHERE id = 1", [], |row| {
row.get("blob")
})
.optional()?;
match state {
Some(state) => ConductorResult::Ok(Some(from_blob(state)?)),
None => ConductorResult::Ok(None),
}
})
.await
}
#[cfg(test)]
impl TestSpaces {
pub async fn new(dna_hashes: impl IntoIterator<Item = DnaHash>) -> Self {
let queue_consumer_map = QueueConsumerMap::new();
Self::with_queue_consumer(dna_hashes, queue_consumer_map).await
}
pub async fn with_queue_consumer(
dna_hashes: impl IntoIterator<Item = DnaHash>,
queue_consumer_map: QueueConsumerMap,
) -> Self {
let mut test_spaces: HashMap<DnaHash, _> = HashMap::new();
for hash in dna_hashes.into_iter() {
test_spaces.insert(hash.clone(), TestSpace::new(hash));
}
let temp_dir = tempfile::Builder::new()
.prefix("holochain-test-environments")
.tempdir()
.unwrap();
let spaces = Spaces::new(
ConductorConfig {
data_root_path: Some(temp_dir.path().to_path_buf().into()),
..Default::default()
}
.into(),
Arc::new(std::sync::Mutex::new(sodoken::LockedArray::from(
b"passphrase".to_vec(),
))),
)
.await
.unwrap();
spaces.map.share_mut(|map| {
map.extend(
test_spaces
.iter()
.map(|(k, v)| (k.clone(), v.space.clone())),
);
});
Self {
queue_consumer_map,
spaces,
test_spaces,
}
}
}
#[cfg(test)]
impl TestSpace {
pub fn new(dna_hash: DnaHash) -> Self {
let temp_dir = tempfile::Builder::new()
.prefix("holochain-test-environments")
.tempdir()
.unwrap();
Self {
space: Space::new(
Arc::new(dna_hash),
temp_dir.path().to_path_buf(),
Default::default(),
Default::default(),
ConductorConfig::default().db_max_readers,
)
.unwrap(),
_temp_dir: temp_dir,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use holochain_conductor_api::conductor::ConductorConfig;
use holochain_types::prelude::DnaHash;
#[tokio::test(flavor = "multi_thread")]
async fn db_max_readers_applied_to_pools() {
let custom_max_readers = 24;
let temp_dir = tempfile::Builder::new().tempdir().unwrap();
let config_with_path = ConductorConfig {
data_root_path: Some(temp_dir.path().to_path_buf().into()),
db_max_readers: custom_max_readers,
..Default::default()
};
let spaces = Spaces::new(
Arc::new(config_with_path),
Arc::new(std::sync::Mutex::new(sodoken::LockedArray::from(
b"passphrase".to_vec(),
))),
)
.await
.unwrap();
let dna_hash = DnaHash::from_raw_36(vec![0; 36]);
let space = spaces.get_or_create_space(&dna_hash).unwrap();
space
.get_or_create_authored_db(AgentPubKey::from_raw_32(vec![0; 32]))
.unwrap();
assert_eq!(space.db_max_readers, custom_max_readers);
assert_eq!(
space.cache_db.connection_pool_max_size(),
custom_max_readers as u32 + 1
);
assert_eq!(
space.dht_db.connection_pool_max_size(),
custom_max_readers as u32 + 1
);
assert_eq!(
space
.get_all_authored_dbs()
.first()
.unwrap()
.connection_pool_max_size(),
custom_max_readers as u32 + 1
);
}
}