#![warn(missing_docs)]
use crate::error::CascadeError;
use crate::get_options_ext::GetOptionsExt;
use error::CascadeResult;
use holo_hash::ActionHash;
use holo_hash::AgentPubKey;
use holo_hash::AnyDhtHash;
use holo_hash::EntryHash;
use holochain_p2p::actor::GetLinksRequestOptions;
use holochain_p2p::actor::{GetActivityOptions, NetworkRequestOptions};
use holochain_p2p::{DynHolochainP2pDna, HolochainP2pError};
use holochain_state::dht_store::DhtStore;
use holochain_state::host_fn_workspace::HostFnStores;
use holochain_state::host_fn_workspace::HostFnWorkspace;
use holochain_state::mutations::insert_action;
use holochain_state::mutations::insert_entry;
use holochain_state::mutations::insert_op_lite;
use holochain_state::mutations::set_validation_status;
use holochain_state::prelude::*;
use holochain_state::query::link::GetLinksFilter;
use holochain_state::scratch::SyncScratch;
use holochain_zome_types::prelude::{FunctionName, ZomeName};
use metrics::{cascade_duration_metric, cascade_fetch_error_metric};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
use tracing::*;
use verify::{rejected_without_warrant, verify_activity_signatures, verify_rendered_ops_batch};
macro_rules! some_or_return {
($n:expr) => {
match $n {
Some(n) => n,
None => return Ok(()),
}
};
($n:expr, $ret:expr) => {
match $n {
Some(n) => n,
None => return Ok($ret),
}
};
}
pub mod authority;
pub mod error;
mod agent_activity;
mod fetch;
pub mod get_options_ext;
mod metrics;
#[cfg(feature = "test_utils")]
mod mock;
mod verify;
#[derive(Debug, Clone)]
pub enum CascadeSource {
Local,
Network,
}
#[derive(Debug, Clone, Default)]
pub struct CascadeOptions {
pub network_request_options: NetworkRequestOptions,
pub get_options: GetOptions,
}
#[derive(Clone)]
pub struct CascadeImpl {
cache: Option<DbWrite<DbKindCache>>,
scratch: Option<SyncScratch>,
network: Option<DynHolochainP2pDna>,
private_data: Option<Arc<AgentPubKey>>,
dht_store: DhtStore,
zome_call_origin: Option<(ZomeName, FunctionName)>,
}
struct CascadeDurationGuard {
start: Instant,
zome_call_origin: Option<(ZomeName, FunctionName)>,
}
impl Drop for CascadeDurationGuard {
fn drop(&mut self) {
let Some((zome, fn_name)) = &self.zome_call_origin else {
return;
};
let attrs = [
opentelemetry::KeyValue::new("zome", zome.to_string()),
opentelemetry::KeyValue::new("fn", fn_name.to_string()),
];
cascade_duration_metric().record(self.start.elapsed().as_secs_f64(), &attrs);
}
}
impl CascadeImpl {
pub fn with_zome_call_origin(self, zome_name: &ZomeName, fn_name: &FunctionName) -> Self {
Self {
zome_call_origin: Some((zome_name.clone(), fn_name.clone())),
..self
}
}
pub fn with_private_data(self, author: Arc<AgentPubKey>) -> Self {
Self {
private_data: Some(author),
..self
}
}
pub fn with_cache(self, cache: DbWrite<DbKindCache>) -> Self {
Self {
cache: Some(cache),
..self
}
}
pub fn with_scratch(self, scratch: SyncScratch) -> Self {
Self {
scratch: Some(scratch),
..self
}
}
pub fn with_network(
self,
network: DynHolochainP2pDna,
cache_db: DbWrite<DbKindCache>,
) -> CascadeImpl {
CascadeImpl {
scratch: self.scratch,
private_data: self.private_data,
cache: Some(cache_db),
network: Some(network),
dht_store: self.dht_store,
zome_call_origin: self.zome_call_origin,
}
}
pub fn empty(dht_store: DhtStore) -> Self {
Self {
network: None,
cache: None,
scratch: None,
private_data: None,
dht_store,
zome_call_origin: None,
}
}
pub fn from_workspace_and_network<AuthorDb, DhtDb>(
workspace: &HostFnWorkspace<AuthorDb, DhtDb>,
network: DynHolochainP2pDna,
) -> CascadeImpl
where
AuthorDb: ReadAccess<DbKindAuthored>,
DhtDb: ReadAccess<DbKindDht>,
{
let HostFnStores {
authored: _,
dht: _,
cache,
scratch,
dht_store,
} = workspace.stores();
let dht_store =
dht_store.expect("HostFnWorkspace always populates dht_store; this is a bug");
let private_data = workspace.author();
CascadeImpl {
cache: Some(cache),
private_data,
scratch,
network: Some(network),
dht_store,
zome_call_origin: None,
}
}
pub fn from_workspace_stores(stores: HostFnStores, author: Option<Arc<AgentPubKey>>) -> Self {
let HostFnStores {
authored: _,
dht: _,
cache,
scratch,
dht_store,
} = stores;
let dht_store =
dht_store.expect("HostFnWorkspace always populates dht_store; this is a bug");
Self {
cache: Some(cache),
scratch,
network: None,
private_data: author,
dht_store,
zome_call_origin: None,
}
}
pub fn cache(&self) -> Option<&DbWrite<DbKindCache>> {
self.cache.as_ref()
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn get_entry_details(
&self,
entry_hash: EntryHash,
options: CascadeOptions,
) -> CascadeResult<Option<EntryDetails>> {
let _guard = self.time_cascade();
let author = self.private_data.as_ref().map(|a| a.as_ref());
let scratch = self.local_scratch();
let read = self.dht_store.as_read();
if options.get_options.strategy() == GetStrategy::Network {
let authoring = self.am_i_authoring(&entry_hash.clone().into())?;
let authority = self.am_i_an_authority(entry_hash.clone().into()).await?;
if !(authoring || authority) {
match self
.fetch_record(entry_hash.clone().into(), options.network_request_options)
.await
{
Ok(_) => (),
Err(CascadeError::NetworkError(
e @ HolochainP2pError::NoPeersForLocation(_, _),
)) => {
tracing::debug!(?e, "No peers to fetch record from");
}
Err(e) => return Err(e),
}
}
}
Ok(read
.get_entry_details_with_scratch(&entry_hash, author, &scratch)
.await?)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn get_record_details(
&self,
action_hash: ActionHash,
options: CascadeOptions,
) -> CascadeResult<Option<RecordDetails>> {
let _guard = self.time_cascade();
let author = self.private_data.as_ref().map(|a| a.as_ref());
let scratch = self.local_scratch();
let read = self.dht_store.as_read();
if options.get_options.strategy() == GetStrategy::Network {
let authoring = self.am_i_authoring(&action_hash.clone().into())?;
let authority = self.am_i_an_authority(action_hash.clone().into()).await?;
if !(authoring || authority) {
match self
.fetch_record(action_hash.clone().into(), options.network_request_options)
.await
{
Ok(_) => (),
Err(CascadeError::NetworkError(
e @ HolochainP2pError::NoPeersForLocation(_, _),
)) => {
tracing::debug!(?e, "No peers to fetch record from");
}
Err(e) => return Err(e),
}
}
}
Ok(read
.get_record_details_with_scratch(&action_hash, author, &scratch)
.await?)
}
fn local_scratch(&self) -> SyncScratch {
self.scratch
.clone()
.unwrap_or_else(|| Scratch::new().into_sync())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn dht_get_action(
&self,
action_hash: ActionHash,
options: GetOptions,
) -> CascadeResult<Option<Record>> {
let _guard = self.time_cascade();
let author = self.private_data.as_ref().map(|a| a.as_ref());
let scratch = self.local_scratch();
let read = self.dht_store.as_read();
if let Some(record) = read
.get_live_record_with_scratch(&action_hash, author, &scratch)
.await?
{
return Ok(Some(record));
}
if options.strategy() == GetStrategy::Network {
let authoring = self.am_i_authoring(&action_hash.clone().into())?;
let authority = self.am_i_an_authority(action_hash.clone().into()).await?;
if !(authoring || authority) {
match self
.fetch_record(action_hash.clone().into(), options.to_network_options())
.await
{
Ok(_) => (),
Err(CascadeError::NetworkError(
e @ HolochainP2pError::NoPeersForLocation(_, _),
)) => {
tracing::debug!(?e, "No peers to fetch record from");
}
Err(e) => return Err(e),
}
}
return Ok(read
.get_live_record_with_scratch(&action_hash, author, &scratch)
.await?);
}
Ok(None)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn dht_get_entry(
&self,
entry_hash: EntryHash,
options: GetOptions,
) -> CascadeResult<Option<Record>> {
let _guard = self.time_cascade();
let author = self.private_data.as_ref().map(|a| a.as_ref());
let scratch = self.local_scratch();
let read = self.dht_store.as_read();
if let Some(record) = read
.get_live_entry_with_scratch(&entry_hash, author, &scratch)
.await?
{
return Ok(Some(record));
}
if options.strategy() == GetStrategy::Network {
let authoring = self.am_i_authoring(&entry_hash.clone().into())?;
let authority = self.am_i_an_authority(entry_hash.clone().into()).await?;
if !(authoring || authority) {
match self
.fetch_record(entry_hash.clone().into(), options.to_network_options())
.await
{
Ok(_) => (),
Err(CascadeError::NetworkError(
e @ HolochainP2pError::NoPeersForLocation(_, _),
)) => {
tracing::debug!(?e, "No peers to fetch record from");
}
Err(e) => return Err(e),
}
}
return Ok(read
.get_live_entry_with_scratch(&entry_hash, author, &scratch)
.await?);
}
Ok(None)
}
pub async fn get_concurrent<I: IntoIterator<Item = AnyDhtHash>>(
&self,
hashes: I,
options: GetOptions,
) -> CascadeResult<Vec<Option<Record>>> {
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
let iter = hashes.into_iter().map({
|hash| {
let options = options.clone();
let cascade = self.clone();
async move { cascade.dht_get(hash, options).await }
}
});
futures::stream::iter(iter)
.buffer_unordered(10)
.try_collect()
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self)))]
pub async fn dht_get(
&self,
hash: AnyDhtHash,
options: GetOptions,
) -> CascadeResult<Option<Record>> {
match hash.into_primitive() {
AnyDhtHashPrimitive::Entry(hash) => self.dht_get_entry(hash, options).await,
AnyDhtHashPrimitive::Action(hash) => self.dht_get_action(hash, options).await,
}
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self)))]
pub async fn get_details(
&self,
hash: AnyDhtHash,
options: GetOptions,
) -> CascadeResult<Option<Details>> {
match hash.into_primitive() {
AnyDhtHashPrimitive::Entry(hash) => Ok(self
.get_entry_details(
hash,
CascadeOptions {
network_request_options: options.to_network_options(),
get_options: options,
},
)
.await?
.map(Details::Entry)),
AnyDhtHashPrimitive::Action(hash) => Ok(self
.get_record_details(
hash,
CascadeOptions {
network_request_options: options.to_network_options(),
get_options: options,
},
)
.await?
.map(Details::Record)),
}
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn dht_get_links(
&self,
key: WireLinkKey,
options: GetLinksRequestOptions,
) -> CascadeResult<Vec<Link>> {
let _guard = self.time_cascade();
if let GetStrategy::Network = options.get_options.strategy() {
let authority = self.am_i_an_authority(key.base.clone()).await?;
if !authority {
match self.fetch_links(key.clone(), options).await {
Ok(_) => (),
Err(CascadeError::NetworkError(
e @ HolochainP2pError::NoPeersForLocation(_, _),
)) => {
tracing::debug!(?e, "No peers to fetch links from");
}
Err(e) => {
return Err(e);
}
}
}
}
let filter = GetLinksFilter {
after: key.after,
before: key.before,
author: key.author,
};
let scratch = self.local_scratch();
Ok(self
.dht_store
.as_read()
.get_links_with_scratch(
&key.base,
&key.type_query,
key.tag.as_ref(),
&filter,
&scratch,
)
.await?)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, key, options)))]
pub async fn get_links_details(
&self,
key: WireLinkKey,
options: GetLinksRequestOptions,
) -> CascadeResult<Vec<(SignedActionHashed, Vec<SignedActionHashed>)>> {
let _guard = self.time_cascade();
if let GetStrategy::Network = options.get_options.strategy() {
let authority = self.am_i_an_authority(key.base.clone()).await?;
if !authority {
match self.fetch_links(key.clone(), options).await {
Ok(_) => (),
Err(CascadeError::NetworkError(
e @ HolochainP2pError::NoPeersForLocation(_, _),
)) => {
tracing::debug!(?e, "No peers to fetch link details from");
}
Err(e) => {
return Err(e);
}
}
}
}
let scratch = self.local_scratch();
Ok(self
.dht_store
.as_read()
.get_link_details_with_scratch(&key.base, &key.type_query, key.tag.as_ref(), &scratch)
.await?)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, query)))]
pub async fn dht_count_links(&self, query: WireLinkQuery) -> CascadeResult<usize> {
let _guard = self.time_cascade();
let mut links = HashSet::<ActionHash>::new();
if !self.am_i_an_authority(query.base.clone()).await? {
if let Some(network) = &self.network {
match network
.count_links(
query.clone(),
NetworkRequestOptions::default(),
self.zome_call_origin.clone(),
)
.await
{
Ok(actions) => {
links.extend(actions.create_link_actions());
}
Err(e @ HolochainP2pError::NoPeersForLocation(_, _)) => {
tracing::debug!(?e, "No peers to fetch link count from");
}
Err(e) => {
return Err(e.into());
}
}
}
}
let filter = GetLinksFilter::from(query.clone());
let scratch = self.local_scratch();
links.extend(
self.dht_store
.as_read()
.get_links_with_scratch(
&query.base,
&query.link_type,
query.tag_prefix.as_ref(),
&filter,
&scratch,
)
.await?
.into_iter()
.map(|l| l.create_link_hash),
);
Ok(links.len())
}
pub async fn must_get_agent_activity(
&self,
author: AgentPubKey,
filter: ChainFilter,
options: NetworkRequestOptions,
) -> CascadeResult<MustGetAgentActivityResponse> {
let _guard = self.time_cascade();
if filter.get_take() == Some(0) {
return Err(CascadeError::InvalidInput(
"ChainFilter take must be greater than 0".to_string(),
));
}
let local_scratch = self.local_scratch();
let local_result = self
.dht_store
.as_read()
.must_get_agent_activity_with_scratch(&author, &filter, &local_scratch)
.await?;
if matches!(local_result, MustGetAgentActivityResponse::Activity { .. }) {
return Ok(local_result);
}
if self.network.is_none() || self.am_i_an_authority(author.clone().into()).await? {
return Ok(local_result);
}
match self
.fetch_must_get_agent_activity(author.clone(), filter.clone(), options)
.await
{
Ok(_) => Ok(self
.dht_store
.as_read()
.must_get_agent_activity_with_scratch(&author, &filter, &local_scratch)
.await?),
Err(CascadeError::NetworkError(e @ HolochainP2pError::NoPeersForLocation(_, _))) => {
tracing::debug!(?e, "No peers to fetch must_get_agent_activity from");
Ok(local_result)
}
Err(e) => Err(e),
}
}
#[cfg_attr(
feature = "instrument",
tracing::instrument(skip(self, agent, query, options))
)]
pub async fn get_agent_activity(
&self,
agent: AgentPubKey,
query: ChainQueryFilter,
options: GetActivityOptions,
) -> CascadeResult<AgentActivityResponse> {
let _guard = self.time_cascade();
let status_only = !(options.include_valid_activity || options.include_rejected_activity);
let authority = self.am_i_an_authority(agent.clone().into()).await?;
let merged_response = if options.get_options.strategy() == GetStrategy::Local {
let dht_options = holochain_state::dht_store::GetAgentActivityOptions {
include_valid_activity: options.include_valid_activity,
include_rejected_activity: options.include_rejected_activity,
include_warrants: options.include_warrants,
include_full_records: options.include_full_records,
};
let scratch = self.local_scratch();
self.dht_store
.as_read()
.get_agent_activity_with_scratch(&agent, &query, &dht_options, &scratch)
.await?
} else {
let results = self
.fetch_agent_activity(agent.clone(), query.clone(), options.clone())
.await?;
let merged_response: AgentActivityResponse =
agent_activity::merge_activities(agent.clone(), &options, results)?;
if !authority && !merged_response.warrants.is_empty() {
if let Some(scratch) = &self.scratch {
if let Err(err) = scratch.apply(|scratch| {
for warrant in merged_response.warrants.iter() {
scratch.add_warrant(warrant.clone());
}
}) {
tracing::warn!(
?err,
"Failed to add warrants from network response to scratch"
);
};
}
}
merged_response
};
if let ChainStatus::Empty = &merged_response.status {
return Ok(AgentActivityResponse::from_empty(merged_response));
}
if status_only {
return Ok(AgentActivityResponse::status_only(merged_response));
}
let AgentActivityResponse {
agent,
mut valid_activity,
mut rejected_activity,
status,
highest_observed,
warrants,
} = merged_response;
if options.include_full_records && query.include_entries {
tracing::debug!("Trying to fill missing entries for agent activity");
valid_activity = self
.fill_missing_chain_item_entries(valid_activity, options.get_options.clone())
.await?;
rejected_activity = self
.fill_missing_chain_item_entries(rejected_activity, options.get_options)
.await?;
}
let r = AgentActivityResponse {
agent,
valid_activity,
rejected_activity,
status,
highest_observed,
warrants,
};
Ok(r)
}
async fn fill_missing_chain_item_entries(
&self,
mut chain_items: ChainItems,
get_options: GetOptions,
) -> CascadeResult<ChainItems> {
let missing_entry_hashes = match &chain_items {
ChainItems::Full(records) => records
.iter()
.filter_map(|r| match r.entry {
RecordEntry::NotStored => r.action().entry_hash().map(|h| h.clone().into()),
_ => None,
})
.collect(),
_ => Vec::with_capacity(0),
};
if !missing_entry_hashes.is_empty() {
trace!(
"There are {} missing entries to fetch",
missing_entry_hashes.len()
);
let maybe_provided_entry_records = self
.get_concurrent(missing_entry_hashes, get_options)
.await?;
trace!("Got {:?} entries", maybe_provided_entry_records.len());
let entry_lookup = maybe_provided_entry_records
.iter()
.filter_map(|r| match r {
Some(r) => r
.signed_action()
.action()
.entry_hash()
.map(|entry_hash| (entry_hash, &r.entry)),
None => None,
})
.collect::<HashMap<_, _>>();
match &mut chain_items {
ChainItems::Full(records) => {
for record in records.iter_mut() {
if let RecordEntry::NotStored = record.entry {
if let Some(entry_hash) = record.action().entry_hash() {
if let Some(entry) = entry_lookup.get(entry_hash) {
record.entry = (*entry).clone();
}
}
}
}
}
_ => {
unreachable!()
}
}
}
Ok(chain_items)
}
#[allow(clippy::result_large_err)] fn am_i_authoring(&self, hash: &AnyDhtHash) -> CascadeResult<bool> {
let scratch = some_or_return!(self.scratch.as_ref(), false);
Ok(scratch.apply_and_then(|scratch| scratch.contains_hash(hash))?)
}
async fn am_i_an_authority(&self, hash: OpBasis) -> CascadeResult<bool> {
let network = some_or_return!(self.network.as_ref(), false);
Ok(network.authority_for_hash(hash).await?)
}
}
#[async_trait::async_trait]
#[cfg_attr(feature = "test_utils", mockall::automock)]
pub trait Cascade {
async fn retrieve_entry(
&self,
hash: EntryHash,
mut options: NetworkRequestOptions,
) -> CascadeResult<Option<(EntryHashed, CascadeSource)>>;
async fn retrieve_action(
&self,
hash: ActionHash,
mut options: NetworkRequestOptions,
) -> CascadeResult<Option<(SignedActionHashed, CascadeSource)>>;
async fn retrieve_public_record(
&self,
hash: AnyDhtHash,
mut options: NetworkRequestOptions,
) -> CascadeResult<Option<(Record, CascadeSource)>>;
}
#[async_trait::async_trait]
impl Cascade for CascadeImpl {
async fn retrieve_entry(
&self,
hash: EntryHash,
options: NetworkRequestOptions,
) -> CascadeResult<Option<(EntryHashed, CascadeSource)>> {
let author = self.private_data.as_ref().map(|a| a.as_ref());
let scratch = self.local_scratch();
let read = self.dht_store.as_read();
if let Some(entry) = read
.retrieve_entry_with_scratch(&hash, author, &scratch)
.await?
{
return Ok(Some((
EntryHashed::from_content_sync(entry),
CascadeSource::Local,
)));
}
self.fetch_record(hash.clone().into(), options).await?;
let result = read
.retrieve_entry_with_scratch(&hash, author, &scratch)
.await?;
Ok(result.map(|e| (EntryHashed::from_content_sync(e), CascadeSource::Network)))
}
async fn retrieve_action(
&self,
hash: ActionHash,
options: NetworkRequestOptions,
) -> CascadeResult<Option<(SignedActionHashed, CascadeSource)>> {
let scratch = self.local_scratch();
let read = self.dht_store.as_read();
if let Some(sah) = read.retrieve_action_with_scratch(&hash, &scratch).await? {
return Ok(Some((sah, CascadeSource::Local)));
}
self.fetch_record(hash.clone().into(), options).await?;
let result = read.retrieve_action_with_scratch(&hash, &scratch).await?;
Ok(result.map(|a| (a, CascadeSource::Network)))
}
async fn retrieve_public_record(
&self,
hash: AnyDhtHash,
options: NetworkRequestOptions,
) -> CascadeResult<Option<(Record, CascadeSource)>> {
if let holo_hash::AnyDhtHashPrimitive::Action(action_hash) = hash.clone().into_primitive() {
let author = self.private_data.as_ref().map(|a| a.as_ref());
let scratch = self.local_scratch();
let read = self.dht_store.as_read();
if let Some(record) = read
.retrieve_record_with_scratch(&action_hash, author, &scratch)
.await?
{
return Ok(Some((record, CascadeSource::Local)));
}
self.fetch_record(hash.clone(), options).await?;
let result = read
.retrieve_record_with_scratch(&action_hash, author, &scratch)
.await?;
return Ok(result.map(|r| (r, CascadeSource::Network)));
}
self.fetch_record(hash.clone(), options).await?;
Ok(None)
}
}
#[cfg(all(test, feature = "test_utils"))]
mod dht_store_scratch_overlay_tests;