use crate::{
content_store::{AddContent, GetContent},
dht::{
actions::remove_queued_holding_workflow::HoldingWorkflowQueueing,
pending_validations::{PendingValidationWithTimeout, ValidationTimeout},
},
instance::RETRY_VALIDATION_DURATION_MIN,
};
use holochain_core_types::{
chain_header::ChainHeader,
crud_status::CrudStatus,
eav::{Attribute, EaviQuery, EntityAttributeValueIndex},
entry::Entry,
error::{HcResult, HolochainError},
network::{
entry_aspect::EntryAspect,
query::{GetLinksQueryConfiguration, Pagination, SortOrder},
},
};
use holochain_json_api::{error::JsonError, json::JsonString};
use holochain_locksmith::RwLock;
use holochain_net::aspect_map::{AspectMap, AspectMapBare};
use holochain_persistence_api::{
cas::{
content::{Address, AddressableContent, Content},
storage::ContentAddressableStorage,
},
eav::{EavFilter, EntityAttributeValueStorage, IndexFilter},
};
use crate::{dht::pending_validations::PendingValidation, state::StateWrapper};
use chrono::{offset::FixedOffset, DateTime};
use holochain_json_api::error::JsonResult;
use holochain_persistence_api::error::PersistenceResult;
use snowflake::ProcessUniqueId;
use std::{
collections::{BTreeSet, HashMap, VecDeque},
convert::TryFrom,
sync::Arc,
time::{Duration, SystemTime},
};
pub type HoldAspectAttemptId = (ProcessUniqueId, ProcessUniqueId);
#[derive(Clone, Debug)]
pub struct DhtStore {
content_storage: Arc<RwLock<dyn ContentAddressableStorage>>,
meta_storage: Arc<RwLock<dyn EntityAttributeValueStorage<Attribute>>>,
holding_map: AspectMap,
holding_attempt_results: HashMap<HoldAspectAttemptId, Result<(), HolochainError>>,
pub(crate) queued_holding_workflows: VecDeque<PendingValidationWithTimeout>,
pub(crate) in_process_holding_workflows: VecDeque<PendingValidationWithTimeout>,
}
impl PartialEq for DhtStore {
fn eq(&self, other: &DhtStore) -> bool {
let content = &self.content_storage.clone();
let other_content = &other.content_storage.clone();
let meta = &self.meta_storage.clone();
let other_meta = &other.meta_storage.clone();
self.holding_map == other.holding_map
&& (*content.read().unwrap()).get_id() == (*other_content.read().unwrap()).get_id()
&& *meta.read().unwrap() == *other_meta.read().unwrap()
}
}
#[derive(Clone, Debug, Deserialize, Serialize, DefaultJson)]
pub struct DhtStoreSnapshot {
pub holding_map: AspectMapBare,
queued_holding_workflows: VecDeque<PendingValidationWithTimeout>,
#[serde(default)]
in_process_holding_workflows: VecDeque<PendingValidationWithTimeout>,
}
impl From<&StateWrapper> for DhtStoreSnapshot {
fn from(state: &StateWrapper) -> Self {
DhtStoreSnapshot {
holding_map: state.dht().get_holding_map().bare().clone(),
queued_holding_workflows: state.dht().queued_holding_workflows.clone(),
in_process_holding_workflows: state.dht().in_process_holding_workflows.clone(),
}
}
}
pub static DHT_STORE_SNAPSHOT_ADDRESS: &str = "DhtStore";
impl AddressableContent for DhtStoreSnapshot {
fn content(&self) -> Content {
self.to_owned().into()
}
fn try_from_content(content: &Content) -> JsonResult<Self> {
Self::try_from(content.to_owned())
}
fn address(&self) -> Address {
DHT_STORE_SNAPSHOT_ADDRESS.into()
}
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub fn create_get_links_eavi_query<'a>(
address: Address,
link_type: Option<String>,
tag: Option<String>,
) -> Result<EaviQuery<'a>, HolochainError> {
Ok(EaviQuery::new(
Some(address).into(),
EavFilter::predicate(move |attr: Attribute| match attr {
Attribute::LinkTag(query_link_type, query_tag)
| Attribute::RemovedLink(_, query_link_type, query_tag) => {
link_type
.clone()
.map(|link_type| link_type == query_link_type)
.unwrap_or(true)
&& tag
.clone()
.map(|tag| tag.to_uppercase() == query_tag.to_uppercase())
.unwrap_or(true)
}
_ => false,
}),
None.into(),
IndexFilter::LatestByAttribute,
Some(EavFilter::predicate(move |attr: Attribute| match attr {
Attribute::RemovedLink(_, _, _) => true,
_ => false,
})),
))
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
impl DhtStore {
pub fn new(
content_storage: Arc<RwLock<dyn ContentAddressableStorage>>,
meta_storage: Arc<RwLock<dyn EntityAttributeValueStorage<Attribute>>>,
) -> Self {
DhtStore {
content_storage,
meta_storage,
holding_map: AspectMap::new(),
queued_holding_workflows: VecDeque::new(),
in_process_holding_workflows: VecDeque::new(),
holding_attempt_results: HashMap::new(),
}
}
pub fn new_from_snapshot(
content_storage: Arc<RwLock<dyn ContentAddressableStorage>>,
meta_storage: Arc<RwLock<dyn EntityAttributeValueStorage<Attribute>>>,
mut snapshot: DhtStoreSnapshot,
) -> Self {
let mut new_dht_store = Self::new(content_storage, meta_storage);
new_dht_store.holding_map = snapshot.holding_map.into();
new_dht_store.queued_holding_workflows = snapshot.in_process_holding_workflows;
new_dht_store
.queued_holding_workflows
.append(&mut snapshot.queued_holding_workflows);
for item in new_dht_store.queued_holding_workflows.iter_mut() {
item.timeout = Some(ValidationTimeout::new(
SystemTime::now(),
RETRY_VALIDATION_DURATION_MIN,
))
}
new_dht_store
}
pub fn get_links(
&self,
address: Address,
link_type: Option<String>,
tag: Option<String>,
crud_filter: Option<CrudStatus>,
configuration: GetLinksQueryConfiguration,
) -> Result<Vec<(EntityAttributeValueIndex, CrudStatus)>, HolochainError> {
let get_links_query = create_get_links_eavi_query(address, link_type, tag)?;
let filtered = self.meta_storage.read()?.fetch_eavi(&get_links_query)?;
let pagination = configuration.pagination;
let filter_with_sort_order: Box<dyn Iterator<Item = EntityAttributeValueIndex>> =
match configuration.sort_order.unwrap_or_default() {
SortOrder::Ascending => Box::new(filtered.into_iter()),
SortOrder::Descending => Box::new(filtered.into_iter().rev()),
};
let filter_with_pagination: Box<dyn Iterator<Item = EntityAttributeValueIndex>> =
match pagination {
Some(paginate) => match paginate {
Pagination::Time(time_pagination) => {
let paginated_time = time_pagination.clone();
Box::new(
filter_with_sort_order
.skip_while(move |eavi| {
let from_time: DateTime<FixedOffset> =
paginated_time.from_time.into();
from_time.timestamp_nanos() >= eavi.index()
})
.take(time_pagination.limit),
)
}
Pagination::Size(size_pagination) => Box::new(
filter_with_sort_order
.skip(size_pagination.page_size * size_pagination.page_number)
.take(size_pagination.page_size),
),
},
None => filter_with_sort_order,
};
Ok(filter_with_pagination
.map(|s| match s.attribute() {
Attribute::LinkTag(_, _) => (s, CrudStatus::Live),
_ => (s, CrudStatus::Deleted),
})
.filter(|link_crud| crud_filter.map(|crud| crud == link_crud.1).unwrap_or(true))
.collect())
}
pub fn get_all_metas(
&self,
address: &Address,
) -> Result<BTreeSet<EntityAttributeValueIndex>, HolochainError> {
let query = EaviQuery::new(
Some(address.to_owned()).into(),
EavFilter::predicate(move |attr: Attribute| match attr {
Attribute::LinkTag(_, _)
| Attribute::RemovedLink(_, _, _)
| Attribute::CrudLink
| Attribute::CrudStatus => true,
_ => false,
}),
None.into(),
IndexFilter::Range(Some(0), Some(std::i64::MAX)),
None,
);
Ok(self.meta_storage.read()?.fetch_eavi(&query)?)
}
pub fn get_headers(&self, entry_address: Address) -> Result<Vec<ChainHeader>, HolochainError> {
self.meta_storage
.read()
.unwrap()
.fetch_eavi(&EaviQuery::new(
Some(entry_address).into(),
Some(Attribute::EntryHeader).into(),
None.into(),
IndexFilter::LatestByAttribute,
None,
))?
.into_iter()
.map(|eavi| eavi.value())
.map(|address| self.get(&address))
.collect::<Result<Vec<Option<_>>, _>>()
.map(|r| {
r.into_iter()
.flatten()
.map(|entry| match entry {
Entry::ChainHeader(chain_header) => Ok(chain_header),
_ => Err(HolochainError::ErrorGeneric(
"Unexpected non-chain_header entry".to_string(),
)),
})
.collect::<Result<Vec<_>, _>>()
})?
.map_err(|err| {
let hc_error: HolochainError = err;
hc_error
})
}
pub fn add_header_for_entry(
&mut self,
entry: &Entry,
header: &ChainHeader,
) -> Result<(), HolochainError> {
let eavi = EntityAttributeValueIndex::new(
&entry.address(),
&Attribute::EntryHeader,
&header.address(),
)?;
self.add(header)?;
self.add_eavi(&eavi)?;
Ok(())
}
pub fn mark_aspect_as_held(&mut self, aspect: &EntryAspect) {
self.holding_map.add(aspect);
}
pub fn mark_hold_aspect_complete(
&mut self,
id: HoldAspectAttemptId,
result: Result<(), HolochainError>,
) {
self.holding_attempt_results.insert(id, result);
}
pub fn hold_aspec_request_complete(
&self,
id: &HoldAspectAttemptId,
) -> Option<&Result<(), HolochainError>> {
self.holding_attempt_results.get(id)
}
pub fn get_holding_map(&self) -> &AspectMap {
&self.holding_map
}
pub(crate) fn fetch_eavi(
&self,
query: &EaviQuery,
) -> PersistenceResult<BTreeSet<EntityAttributeValueIndex>> {
self.meta_storage.read().unwrap().fetch_eavi(query)
}
pub(crate) fn add_eavi(
&mut self,
eavi: &EntityAttributeValueIndex,
) -> PersistenceResult<Option<EntityAttributeValueIndex>> {
self.meta_storage.write().unwrap().add_eavi(&eavi)
}
pub(crate) fn next_queued_holding_workflow(
&self,
) -> Option<(PendingValidation, Option<Duration>)> {
self.queued_holding_workflows
.clone()
.into_iter()
.filter(free_pending_filter(&self.queued_holding_workflows))
.skip_while(|PendingValidationWithTimeout { timeout, .. }| {
if let Some(ValidationTimeout {
time_of_dispatch,
delay,
}) = timeout
{
let maybe_time_elapsed = time_of_dispatch.elapsed();
if let Ok(time_elapsed) = maybe_time_elapsed {
if time_elapsed < *delay {
return true;
}
}
}
false
})
.map(|PendingValidationWithTimeout { pending, timeout }| {
(pending, timeout.map(|t| Some(t.delay)).unwrap_or(None))
})
.next()
}
pub(crate) fn has_exact_queued_holding_workflow(&self, pending: &PendingValidation) -> bool {
self.queued_holding_workflows.iter().any(
|PendingValidationWithTimeout {
pending: current, ..
}| current == pending,
)
}
pub(crate) fn has_exact_in_process_holding_workflow(
&self,
pending: &PendingValidation,
) -> bool {
self.in_process_holding_workflows.iter().any(
|PendingValidationWithTimeout {
pending: current, ..
}| current == pending,
)
}
pub(crate) fn has_same_queued_holding_worfkow(&self, pending: &PendingValidation) -> bool {
self.queued_holding_workflows.iter().any(
|PendingValidationWithTimeout {
pending: current, ..
}| {
current.entry_with_header.header == pending.entry_with_header.header
&& current.workflow == pending.workflow
},
)
}
pub(crate) fn has_same_in_process_holding_worfkow(&self, pending: &PendingValidation) -> bool {
self.in_process_holding_workflows.iter().any(
|PendingValidationWithTimeout {
pending: current, ..
}| {
current.entry_with_header.header == pending.entry_with_header.header
&& current.workflow == pending.workflow
},
)
}
pub(crate) fn queued_holding_workflows(&self) -> &VecDeque<PendingValidationWithTimeout> {
&self.queued_holding_workflows
}
pub(crate) fn in_process_holding_workflows(&self) -> &VecDeque<PendingValidationWithTimeout> {
&self.in_process_holding_workflows
}
pub(crate) fn update_queued_holding_workflow(
&mut self,
state: &HoldingWorkflowQueueing,
item: &PendingValidation,
) {
match state {
HoldingWorkflowQueueing::Waiting(delay) => {
match self
.in_process_holding_workflows
.iter()
.position(|PendingValidationWithTimeout { pending, .. }| pending == item)
.and_then(|index| self.in_process_holding_workflows.remove(index))
{
None => {
error!(
"update_queued_holding_workflow {:?} not found in process!",
item
);
}
Some(pending) => {
let mut pending = pending;
pending.timeout = Some(ValidationTimeout::new(SystemTime::now(), *delay));
self.queued_holding_workflows.push_back(pending);
}
}
}
HoldingWorkflowQueueing::Processing => {
match self
.queued_holding_workflows()
.iter()
.position(|PendingValidationWithTimeout { pending, .. }| pending == item)
.and_then(|index| self.queued_holding_workflows.remove(index))
{
None => {
error!(
"update_queued_holding_workflow {:?} not found waiting!",
item
);
}
Some(pending) => {
self.in_process_holding_workflows.push_back(pending);
}
}
}
HoldingWorkflowQueueing::Done => {
if self
.in_process_holding_workflows
.iter()
.position(|PendingValidationWithTimeout { pending, .. }| pending == item)
.and_then(|index| self.in_process_holding_workflows.remove(index))
.is_none()
{
error!(
"update_queued_holding_workflow {:?} not found in process!",
item
);
}
}
}
}
}
use im::HashSet;
fn free_pending_filter<I>(pending: &I) -> Box<dyn Fn(&PendingValidationWithTimeout) -> bool>
where
I: IntoIterator<Item = PendingValidationWithTimeout> + Clone,
{
let unique_pending: HashSet<Address> = pending
.clone()
.into_iter()
.map(|p| p.pending.entry_with_header.entry.address())
.collect();
Box::new(move |p| {
p.pending
.dependencies
.iter()
.all(|dep_addr| !unique_pending.contains(dep_addr))
})
}
impl GetContent for DhtStore {
fn get_raw(&self, address: &Address) -> HcResult<Option<Content>> {
Ok((*self.content_storage.read().unwrap()).fetch(address)?)
}
}
impl AddContent for DhtStore {
fn add<T: AddressableContent>(&mut self, content: &T) -> HcResult<()> {
(*self.content_storage.write().unwrap())
.add(content)
.map_err(|e| e.into())
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::{
dht::pending_validations::{PendingValidationStruct, ValidatingWorkflow},
network::entry_with_header::EntryWithHeader,
};
use holochain_core_types::{
chain_header::test_chain_header_with_sig,
entry::{test_entry, test_entry_a, test_entry_b, test_entry_c},
};
use holochain_persistence_api::{
cas::storage::ExampleContentAddressableStorage, eav::ExampleEntityAttributeValueStorage,
};
#[test]
fn get_headers_roundtrip() {
let mut store = DhtStore::new(
Arc::new(RwLock::new(
ExampleContentAddressableStorage::new().unwrap(),
)),
Arc::new(RwLock::new(ExampleEntityAttributeValueStorage::new())),
);
let entry = test_entry();
let header1 = test_chain_header_with_sig("sig1", None);
let header2 = test_chain_header_with_sig("sig2", None);
store.add_header_for_entry(&entry, &header1).unwrap();
store.add_header_for_entry(&entry, &header2).unwrap();
let headers = store.get_headers(entry.address()).unwrap();
assert_eq!(headers, vec![header1, header2]);
}
fn pending_validation_for_entry(
entry: Entry,
dependencies: Vec<Address>,
) -> PendingValidationWithTimeout {
let header = test_chain_header_with_sig("sig1", None);
let mut pending_struct = PendingValidationStruct::new(
EntryWithHeader { entry, header },
ValidatingWorkflow::HoldEntry,
);
pending_struct.dependencies = dependencies;
PendingValidationWithTimeout::new(Arc::new(pending_struct.clone()), None)
}
#[test]
fn test_dependency_resolution_no_dependencies() {
let a = pending_validation_for_entry(test_entry_a(), Vec::new());
let b = pending_validation_for_entry(test_entry_b(), Vec::new());
let pending_list = vec![a.clone(), b.clone()];
assert_eq!(
pending_list
.clone()
.into_iter()
.filter(free_pending_filter(&pending_list))
.collect::<Vec<_>>(),
vec![a, b]
);
}
#[test]
fn test_dependency_resolution_chain() {
let a = pending_validation_for_entry(test_entry_a(), vec![test_entry_b().address()]);
let b = pending_validation_for_entry(test_entry_b(), vec![test_entry_c().address()]);
let c = pending_validation_for_entry(test_entry_c(), vec![]);
let pending_list = vec![a.clone(), b.clone(), c.clone()];
assert_eq!(
pending_list
.clone()
.into_iter()
.filter(free_pending_filter(&pending_list))
.collect::<Vec<_>>(),
vec![c]
);
}
#[test]
fn test_dependency_resolution_tree() {
let a = pending_validation_for_entry(
test_entry_a(),
vec![test_entry_b().address(), test_entry_c().address()],
);
let b = pending_validation_for_entry(test_entry_b(), vec![]);
let c = pending_validation_for_entry(test_entry_c(), vec![]);
let pending_list = vec![a.clone(), b.clone(), c.clone()];
assert_eq!(
pending_list
.clone()
.into_iter()
.filter(free_pending_filter(&pending_list))
.collect::<Vec<_>>(),
vec![b, c]
);
}
}