use holochain_core_types::error::HolochainError;
use crate::{
action::{Action, ActionWrapper},
dht::{
dht_store::DhtStore,
pending_validations::{PendingValidationWithTimeout, ValidationTimeout},
},
};
use std::sync::Arc;
use super::dht_inner_reducers::{
reduce_add_remove_link_inner, reduce_remove_entry_inner, reduce_store_entry_inner,
reduce_update_entry_inner, LinkModification,
};
use holochain_core_types::{entry::Entry, network::entry_aspect::EntryAspect};
use holochain_persistence_api::cas::content::AddressableContent;
use itertools::Itertools;
use std::collections::VecDeque;
type DhtReducer = fn(&DhtStore, &ActionWrapper) -> Option<DhtStore>;
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub fn reduce(old_store: Arc<DhtStore>, action_wrapper: &ActionWrapper) -> Arc<DhtStore> {
let reducer = match resolve_reducer(action_wrapper) {
Some(reducer) => reducer,
None => {
return old_store;
}
};
match reducer(&old_store.clone(), &action_wrapper) {
None => old_store,
Some(new_store) => Arc::new(new_store),
}
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
fn resolve_reducer(action_wrapper: &ActionWrapper) -> Option<DhtReducer> {
match action_wrapper.action() {
Action::Commit(_) => Some(reduce_commit_entry),
Action::HoldAspect(_) => Some(reduce_hold_aspect),
Action::QueueHoldingWorkflow(_) => Some(reduce_queue_holding_workflow),
Action::RemoveQueuedHoldingWorkflow(_) => Some(reduce_remove_queued_holding_workflow),
Action::Prune => Some(reduce_prune),
_ => None,
}
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub(crate) fn reduce_commit_entry(
old_store: &DhtStore,
action_wrapper: &ActionWrapper,
) -> Option<DhtStore> {
let (entry, _, _) = unwrap_to!(action_wrapper.action() => Action::Commit);
let mut new_store = (*old_store).clone();
match reduce_store_entry_inner(&mut new_store, entry) {
Ok(()) => Some(new_store),
Err(e) => {
error!("{}", e);
None
}
}
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub(crate) fn reduce_hold_aspect(
old_store: &DhtStore,
action_wrapper: &ActionWrapper,
) -> Option<DhtStore> {
let (aspect, id) = unwrap_to!(action_wrapper.action() => Action::HoldAspect);
let mut new_store = (*old_store).clone();
debug!("reduce_hold_aspect: {:?}", aspect);
let mut hold_result: Result<(), HolochainError> = Ok(());
let mut maybe_store = match aspect {
EntryAspect::Content(entry, header) => {
match reduce_store_entry_inner(&mut new_store, &entry) {
Ok(()) => {
new_store.add_header_for_entry(&entry, &header).ok()?;
Some(new_store)
}
Err(e) => {
let err = format!("EntryAspect::Content hold error: {}", e);
hold_result = Err(HolochainError::ErrorGeneric(err));
None
}
}
}
EntryAspect::LinkAdd(link_data, header) => {
let entry = Entry::LinkAdd(link_data.clone());
match reduce_add_remove_link_inner(
&mut new_store,
&link_data,
&entry.address(),
LinkModification::Add,
header,
) {
Ok(_) => Some(new_store),
Err(e) => {
let err = format!("EntryAspect::LinkAdd hold error: {}", e);
hold_result = Err(HolochainError::ErrorGeneric(err));
None
}
}
}
EntryAspect::LinkRemove((link_data, links_to_remove), header) => {
let folded_result = Some(links_to_remove.iter().fold(
new_store,
|mut store, link_addresses| {
if let Err(e) = reduce_add_remove_link_inner(
&mut store,
&link_data,
link_addresses,
LinkModification::Remove,
header,
) {
let err = format!("EntryAspect::LinkRemove hold error: {}", e);
hold_result = Err(HolochainError::ErrorGeneric(err));
}
store
},
));
if hold_result.is_ok() {
folded_result
} else {
None
}
}
EntryAspect::Update(entry, header) => {
if let Some(crud_link) = header.link_update_delete() {
match reduce_update_entry_inner(&mut new_store, &crud_link, &entry.address()) {
Ok(_) => Some(new_store),
Err(e) => {
let err = format!("EntryAspect::Update hold error: {}", e);
hold_result = Err(HolochainError::ErrorGeneric(err));
None
}
}
} else {
let err = "EntryAspect::Update without crud_link in header received!";
hold_result = Err(HolochainError::ErrorGeneric(err.to_string()));
None
}
}
EntryAspect::Deletion(header) => {
if let Some(crud_link) = header.link_update_delete() {
match reduce_remove_entry_inner(&mut new_store, &crud_link, &header.entry_address())
{
Ok(_) => Some(new_store),
Err(e) => {
let err = format!("EntryAspect::Deletion hold error: {}", e);
hold_result = Err(HolochainError::ErrorGeneric(err));
None
}
}
} else {
let err = "EntryAspect::Deletion without crud_link in header received!";
hold_result = Err(HolochainError::ErrorGeneric(err.to_string()));
None
}
}
EntryAspect::Header(_) => {
let err = "Got EntryAspect::Header which is not implemented.";
hold_result = Err(HolochainError::ErrorGeneric(err.to_string()));
None
}
};
if let Some(ref mut store) = maybe_store {
debug!(
"reduce_hold_aspect: marking as held {}-{:?}",
aspect.address(),
aspect
);
store.mark_aspect_as_held(&aspect);
store.mark_hold_aspect_complete(id.clone(), hold_result);
maybe_store
} else {
debug!(
"reduce_hold_aspect: ERRORED with {:?} not marking as held {}-{:?}",
hold_result,
aspect.address(),
aspect
);
let mut store = (*old_store).clone();
store.mark_hold_aspect_complete(id.clone(), hold_result);
Some(store)
}
}
#[allow(dead_code)]
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub(crate) fn reduce_get_links(
_old_store: &DhtStore,
_action_wrapper: &ActionWrapper,
) -> Option<DhtStore> {
None
}
#[allow(unknown_lints)]
#[allow(clippy::needless_pass_by_value)]
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub fn reduce_queue_holding_workflow(
old_store: &DhtStore,
action_wrapper: &ActionWrapper,
) -> Option<DhtStore> {
let action = action_wrapper.action();
let (pending, maybe_delay) = unwrap_to!(action => Action::QueueHoldingWorkflow);
let entry_aspect = EntryAspect::from((**pending).clone());
if old_store.get_holding_map().contains(&entry_aspect) {
debug!(
"Skipping add pending validation to queue which is already held for aspect {:?}",
entry_aspect
);
None
} else if old_store.has_same_queued_holding_worfkow(pending) {
debug!(
"Skipping add pending validation to queue which is already queued for aspect {:?}",
entry_aspect
);
None
} else if old_store.has_same_in_process_holding_worfkow(pending) {
debug!(
"Skipping add pending validation to queue which is already in process for aspect {:?}",
entry_aspect
);
None
} else {
let mut new_store = (*old_store).clone();
new_store
.queued_holding_workflows
.push_back(PendingValidationWithTimeout::new(
pending.clone(),
maybe_delay.map(ValidationTimeout::from),
));
Some(new_store)
}
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub fn reduce_prune(old_store: &DhtStore, _action_wrapper: &ActionWrapper) -> Option<DhtStore> {
let pruned_queue = old_store
.queued_holding_workflows
.iter()
.unique_by(|p| {
(
p.pending.workflow.clone(),
p.pending.entry_with_header.header.entry_address(),
)
})
.cloned()
.collect::<VecDeque<_>>();
if pruned_queue.len() < old_store.queued_holding_workflows.len() {
let mut new_store = (*old_store).clone();
new_store.queued_holding_workflows = pruned_queue;
Some(new_store)
} else {
None
}
}
#[allow(unknown_lints)]
#[allow(clippy::needless_pass_by_value)]
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub fn reduce_remove_queued_holding_workflow(
old_store: &DhtStore,
action_wrapper: &ActionWrapper,
) -> Option<DhtStore> {
let action = action_wrapper.action();
let (state, pending) = unwrap_to!(action => Action::RemoveQueuedHoldingWorkflow);
let mut new_store = (*old_store).clone();
new_store.update_queued_holding_workflow(state, pending);
Some(new_store)
}
#[cfg(test)]
pub mod tests {
use crate::{
action::{Action, ActionWrapper},
content_store::{AddContent, GetContent},
dht::{
actions::remove_queued_holding_workflow::HoldingWorkflowQueueing,
dht_reducers::{
reduce, reduce_hold_aspect, reduce_queue_holding_workflow,
reduce_remove_queued_holding_workflow,
},
dht_store::{create_get_links_eavi_query, DhtStore},
pending_validations::{PendingValidation, PendingValidationStruct, ValidatingWorkflow},
},
instance::tests::test_context,
network::entry_with_header::EntryWithHeader,
state::test_store,
};
use bitflags::_core::time::Duration;
use holochain_core_types::{
agent::{test_agent_id, test_agent_id_with_name},
chain_header::{test_chain_header, test_chain_header_with_sig},
eav::Attribute,
entry::{test_entry, test_sys_entry, Entry},
link::{link_data::LinkData, Link, LinkActionKind},
network::entry_aspect::EntryAspect,
};
use holochain_persistence_api::cas::content::{Address, AddressableContent};
use snowflake::ProcessUniqueId;
use std::{sync::Arc, time::SystemTime};
#[allow(dead_code)]
fn enable_logging_for_test() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "trace");
}
let _ = env_logger::builder()
.default_format_timestamp(false)
.default_format_module_path(false)
.is_test(true)
.try_init();
}
#[test]
fn reduce_hold_aspect_test() {
let context = test_context("bob", None);
let store = test_store(context);
let sys_entry = test_sys_entry();
let new_dht_store = reduce_hold_aspect(
&store.dht(),
&ActionWrapper::new(Action::HoldAspect((
EntryAspect::Content(sys_entry.clone(), test_chain_header()),
(ProcessUniqueId::new(), ProcessUniqueId::new()),
))),
)
.expect("there should be a new store for committing a sys entry");
assert_eq!(
Some(sys_entry.clone()),
store.dht().get(&sys_entry.address()).unwrap()
);
assert_eq!(
Some(sys_entry.clone()),
new_dht_store
.get(&sys_entry.address())
.expect("could not fetch from cas")
);
}
#[test]
fn can_add_links() {
enable_logging_for_test();
let context = test_context("bob", None);
let store = test_store(context.clone());
let entry = test_entry();
let _ = (*store.dht()).clone().add(&entry);
let test_link = String::from("test_link");
let test_tag = String::from("test-tag");
let link = Link::new(
&entry.address(),
&entry.address(),
&test_link.clone(),
&test_tag.clone(),
);
let link_data = LinkData::from_link(
&link,
LinkActionKind::ADD,
test_chain_header(),
test_agent_id(),
);
let action = ActionWrapper::new(Action::HoldAspect((
EntryAspect::LinkAdd(link_data.clone(), test_chain_header()),
(ProcessUniqueId::new(), ProcessUniqueId::new()),
)));
let link_entry = Entry::LinkAdd(link_data.clone());
let new_dht_store = (*reduce(store.dht(), &action)).clone();
let get_links_query =
create_get_links_eavi_query(entry.address(), Some(test_link), Some(test_tag))
.expect("supposed to create link query");
let fetched = new_dht_store.fetch_eavi(&get_links_query);
assert!(fetched.is_ok());
let hash_set = fetched.unwrap();
assert_eq!(hash_set.len(), 1);
let eav = hash_set.iter().nth(0).unwrap();
assert_eq!(eav.entity(), *link.base());
assert_eq!(eav.value(), link_entry.address());
assert_eq!(
eav.attribute(),
Attribute::LinkTag(link.link_type().to_owned(), link.tag().to_owned())
);
}
#[test]
fn can_remove_links() {
let context = test_context("bob", None);
let store = test_store(context.clone());
let entry = test_entry();
let _ = (*store.dht()).clone().add(&entry);
let test_link = String::from("test_link");
let test_tag = String::from("test-tag");
let link = Link::new(
&entry.address(),
&entry.address(),
&test_link.clone(),
&test_tag.clone(),
);
let link_data = LinkData::from_link(
&link,
LinkActionKind::ADD,
test_chain_header(),
test_agent_id(),
);
let entry_link_add = Entry::LinkAdd(link_data.clone());
let action_link_add = ActionWrapper::new(Action::HoldAspect((
EntryAspect::LinkAdd(link_data.clone(), test_chain_header()),
(ProcessUniqueId::new(), ProcessUniqueId::new()),
)));
let new_dht_store = reduce(store.dht(), &action_link_add);
let link_remove_header = test_chain_header();
let link_remove_data = LinkData::from_link(
&link.clone(),
LinkActionKind::REMOVE,
link_remove_header.clone(),
test_agent_id(),
);
let action_link_remove = ActionWrapper::new(Action::HoldAspect((
EntryAspect::LinkRemove(
(
link_remove_data.clone(),
vec![entry_link_add.clone().address()],
),
test_chain_header(),
),
(ProcessUniqueId::new(), ProcessUniqueId::new()),
)));
let new_dht_store = reduce(new_dht_store, &action_link_remove);
let get_links_query = create_get_links_eavi_query(
entry.address(),
Some(test_link.clone()),
Some(test_tag.clone()),
)
.expect("supposed to create link query");
let fetched = new_dht_store.fetch_eavi(&get_links_query);
assert!(fetched.is_ok());
let hash_set = fetched.unwrap();
assert_eq!(hash_set.len(), 1);
let eav = hash_set.iter().nth(0).unwrap();
assert_eq!(eav.entity(), *link.base());
let link_entry = link.add_entry(test_chain_header(), test_agent_id());
assert_eq!(eav.value(), link_entry.address());
assert_eq!(
eav.attribute(),
Attribute::RemovedLink(
link_remove_header.entry_address().clone(),
link.link_type().to_string(),
link.tag().to_string()
)
);
let action_link_add = ActionWrapper::new(Action::HoldAspect((
EntryAspect::LinkAdd(link_data.clone(), test_chain_header()),
(ProcessUniqueId::new(), ProcessUniqueId::new()),
)));
let new_dht_store = reduce(store.dht(), &action_link_add);
let get_links_query = create_get_links_eavi_query(
entry.address(),
Some(test_link.clone()),
Some(test_tag.clone()),
)
.expect("supposed to create link query");
let fetched = new_dht_store.fetch_eavi(&get_links_query);
assert!(fetched.is_ok());
let hash_set = fetched.unwrap();
assert_eq!(hash_set.len(), 1);
let eav = hash_set.iter().nth(0).unwrap();
assert_eq!(eav.entity(), *link.base());
let link_entry = link.add_entry(test_chain_header(), test_agent_id());
assert_eq!(eav.value(), link_entry.address());
assert_eq!(
eav.attribute(),
Attribute::RemovedLink(
link_remove_header.entry_address().clone(),
link.link_type().to_string(),
link.tag().to_string()
)
);
let link_data = LinkData::from_link(
&link.clone(),
LinkActionKind::ADD,
test_chain_header(),
test_agent_id_with_name("new_agent"),
);
let entry_link_add = Entry::LinkAdd(link_data.clone());
let action_link_add = ActionWrapper::new(Action::HoldAspect((
EntryAspect::LinkAdd(link_data.clone(), test_chain_header()),
(ProcessUniqueId::new(), ProcessUniqueId::new()),
)));
let new_dht_store_2 = reduce(store.dht(), &action_link_add);
let get_links_query =
create_get_links_eavi_query(entry.address(), Some(test_link), Some(test_tag))
.expect("supposed to create link query");
let fetched = new_dht_store_2.fetch_eavi(&get_links_query);
assert!(fetched.is_ok());
let hash_set = fetched.unwrap();
assert_eq!(hash_set.len(), 2);
let eav = hash_set.iter().nth(1).unwrap();
assert_eq!(eav.entity(), *link.base());
let _link_entry = link.add_entry(test_chain_header(), test_agent_id());
assert_eq!(eav.value(), entry_link_add.address());
assert_eq!(
eav.attribute(),
Attribute::LinkTag(link.link_type().to_string(), link.tag().to_string())
);
}
#[test]
fn does_not_add_link_for_missing_base() {
let context = test_context("bob", None);
let store = test_store(context.clone());
let entry = test_entry();
let test_link = String::from("test-link-type");
let test_tag = String::from("test-tag");
let link = Link::new(
&entry.address(),
&entry.address(),
&test_link.clone(),
&test_tag.clone(),
);
let link_data = LinkData::from_link(
&link.clone(),
LinkActionKind::ADD,
test_chain_header(),
test_agent_id(),
);
let action = ActionWrapper::new(Action::HoldAspect((
EntryAspect::LinkAdd(link_data.clone(), test_chain_header()),
(ProcessUniqueId::new(), ProcessUniqueId::new()),
)));
let new_dht_store = reduce(store.dht(), &action);
let get_links_query =
create_get_links_eavi_query(entry.address(), Some(test_link), Some(test_tag))
.expect("supposed to create link query");
let fetched = new_dht_store.fetch_eavi(&get_links_query);
assert!(fetched.is_ok());
let hash_set = fetched.unwrap();
assert_eq!(hash_set.len(), 0);
}
#[cfg(feature = "broken-tests")]
#[test]
#[cfg(feature = "broken-tests")]
pub fn reduce_hold_test() {
let context = test_context("bill", None);
let store = test_store(context.clone());
let entry = test_entry();
let action_wrapper = ActionWrapper::new(Action::HoldAspect((
EntryAspect::Content(entry.clone(), test_chain_header()),
(ProcessUniqueId::new(), ProcessUniqueId::new()),
)));
store.reduce(action_wrapper);
let cas = context.dht_storage.read().unwrap();
let maybe_json = cas.fetch(&entry.address()).unwrap();
let result_entry = match maybe_json {
Some(content) => Entry::try_from(content).unwrap(),
None => panic!("Could not find received entry in CAS"),
};
assert_eq!(&entry, &result_entry,);
}
fn create_pending_validation(
entry: Entry,
workflow: ValidatingWorkflow,
link_update_delete: Option<Address>,
) -> PendingValidation {
let entry_with_header = EntryWithHeader {
entry: entry.clone(),
header: test_chain_header_with_sig("sig", link_update_delete),
};
Arc::new(PendingValidationStruct::new(entry_with_header, workflow))
}
#[test]
pub fn test_holding_queue() {
let context = test_context("test", None);
let store = DhtStore::new(context.dht_storage.clone(), context.eav_storage.clone());
assert_eq!(store.queued_holding_workflows().len(), 0);
let test_entry = test_entry();
let hold =
create_pending_validation(test_entry.clone(), ValidatingWorkflow::HoldEntry, None);
let hold_header = hold.entry_with_header.header.clone();
let action = ActionWrapper::new(Action::QueueHoldingWorkflow((
hold.clone(),
Some((SystemTime::now(), Duration::from_secs(10000))),
)));
let store = reduce_queue_holding_workflow(&store, &action).unwrap();
assert_eq!(store.queued_holding_workflows().len(), 1);
assert!(store.has_exact_queued_holding_workflow(&hold));
let test_link = String::from("test_link");
let test_tag = String::from("test-tag");
let link = Link::new(
&test_entry.address(),
&test_entry.address(),
&test_link.clone(),
&test_tag.clone(),
);
let link_data = LinkData::from_link(
&link,
LinkActionKind::ADD,
test_chain_header(),
test_agent_id(),
);
let link_entry = Entry::LinkAdd(link_data.clone());
let hold_link = create_pending_validation(link_entry, ValidatingWorkflow::HoldLink, None);
let action = ActionWrapper::new(Action::QueueHoldingWorkflow((hold_link.clone(), None)));
let store = reduce_queue_holding_workflow(&store, &action).unwrap();
assert_eq!(store.queued_holding_workflows().len(), 2);
assert!(store.has_exact_queued_holding_workflow(&hold_link));
let action = ActionWrapper::new(Action::RemoveQueuedHoldingWorkflow((
HoldingWorkflowQueueing::Processing,
hold.clone(),
)));
let store = reduce_remove_queued_holding_workflow(&store, &action).unwrap();
let action = ActionWrapper::new(Action::RemoveQueuedHoldingWorkflow((
HoldingWorkflowQueueing::Done,
hold.clone(),
)));
let store = reduce_remove_queued_holding_workflow(&store, &action).unwrap();
let (next_pending, _) = store.next_queued_holding_workflow().unwrap();
assert_eq!(hold_link, next_pending);
let update = create_pending_validation(
test_entry.clone(),
ValidatingWorkflow::UpdateEntry,
Some(hold_header.address()),
);
let action = ActionWrapper::new(Action::QueueHoldingWorkflow((update.clone(), None)));
let store = reduce_queue_holding_workflow(&store, &action).unwrap();
assert_eq!(store.queued_holding_workflows().len(), 2);
assert!(!store.has_exact_queued_holding_workflow(&hold));
assert!(store.has_exact_queued_holding_workflow(&update));
assert!(store.has_exact_queued_holding_workflow(&hold_link));
let action = ActionWrapper::new(Action::RemoveQueuedHoldingWorkflow((
HoldingWorkflowQueueing::Processing,
hold_link.clone(),
)));
let store = reduce_remove_queued_holding_workflow(&store, &action).unwrap();
let action = ActionWrapper::new(Action::RemoveQueuedHoldingWorkflow((
HoldingWorkflowQueueing::Done,
hold_link.clone(),
)));
let store = reduce_remove_queued_holding_workflow(&store, &action).unwrap();
assert_eq!(store.queued_holding_workflows().len(), 1);
assert!(!store.has_exact_queued_holding_workflow(&hold));
assert!(!store.has_exact_queued_holding_workflow(&hold_link));
assert!(store.has_exact_queued_holding_workflow(&update));
let (next_pending, _) = store.next_queued_holding_workflow().unwrap();
assert_eq!(update, next_pending);
}
}