#![warn(missing_docs)]
use crate::error::CascadeError;
use error::CascadeResult;
use holo_hash::ActionHash;
use holo_hash::AgentPubKey;
use holo_hash::AnyDhtHash;
use holo_hash::EntryHash;
use holochain_p2p::actor::GetActivityOptions;
use holochain_p2p::actor::GetLinksOptions;
use holochain_p2p::actor::GetOptions as NetworkGetOptions;
use holochain_p2p::{DynHolochainP2pDna, HolochainP2pError};
use holochain_state::host_fn_workspace::HostFnStores;
use holochain_state::host_fn_workspace::HostFnWorkspace;
use holochain_state::mutations::insert_action;
use holochain_state::mutations::insert_entry;
use holochain_state::mutations::insert_op_lite;
use holochain_state::mutations::set_validation_status;
use holochain_state::prelude::*;
use holochain_state::query::entry_details::GetEntryDetailsQuery;
use holochain_state::query::link::{GetLinksFilter, GetLinksQuery};
use holochain_state::query::link_details::GetLinkDetailsQuery;
use holochain_state::query::live_entry::GetLiveEntryQuery;
use holochain_state::query::live_record::GetLiveRecordQuery;
use holochain_state::query::record_details::GetRecordDetailsQuery;
use holochain_state::query::DbScratch;
use holochain_state::query::PrivateDataQuery;
use holochain_state::scratch::SyncScratch;
use metrics::create_cascade_duration_metric;
use metrics::CascadeDurationMetric;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
use tracing::*;
pub mod authority;
pub mod error;
mod agent_activity;
mod metrics;
#[cfg(feature = "test_utils")]
pub mod test_utils;
macro_rules! some_or_return {
($n:expr) => {
match $n {
Some(n) => n,
None => return Ok(()),
}
};
($n:expr, $ret:expr) => {
match $n {
Some(n) => n,
None => return Ok($ret),
}
};
}
#[derive(Debug, Clone)]
pub enum CascadeSource {
Local,
Network,
}
#[derive(Clone)]
pub struct CascadeImpl {
authored: Option<DbRead<DbKindAuthored>>,
dht: Option<DbRead<DbKindDht>>,
cache: Option<DbWrite<DbKindCache>>,
scratch: Option<SyncScratch>,
network: Option<DynHolochainP2pDna>,
private_data: Option<Arc<AgentPubKey>>,
duration_metric: &'static CascadeDurationMetric,
}
impl CascadeImpl {
pub fn with_authored(self, authored: DbRead<DbKindAuthored>) -> Self {
Self {
authored: Some(authored),
..self
}
}
pub fn with_private_data(self, author: Arc<AgentPubKey>) -> Self {
Self {
private_data: Some(author),
..self
}
}
pub fn with_dht(self, dht: DbRead<DbKindDht>) -> Self {
Self {
dht: Some(dht),
..self
}
}
pub fn with_cache(self, cache: DbWrite<DbKindCache>) -> Self {
Self {
cache: Some(cache),
..self
}
}
pub fn with_scratch(self, scratch: SyncScratch) -> Self {
Self {
scratch: Some(scratch),
..self
}
}
pub fn with_network(
self,
network: DynHolochainP2pDna,
cache_db: DbWrite<DbKindCache>,
) -> CascadeImpl {
CascadeImpl {
authored: self.authored,
dht: self.dht,
scratch: self.scratch,
private_data: self.private_data,
cache: Some(cache_db),
network: Some(network),
duration_metric: create_cascade_duration_metric(),
}
}
pub fn empty() -> Self {
Self {
authored: None,
dht: None,
network: None,
cache: None,
scratch: None,
private_data: None,
duration_metric: create_cascade_duration_metric(),
}
}
pub fn from_workspace_and_network<AuthorDb, DhtDb>(
workspace: &HostFnWorkspace<AuthorDb, DhtDb>,
network: DynHolochainP2pDna,
) -> CascadeImpl
where
AuthorDb: ReadAccess<DbKindAuthored>,
DhtDb: ReadAccess<DbKindDht>,
{
let HostFnStores {
authored,
dht,
cache,
scratch,
} = workspace.stores();
let private_data = workspace.author();
CascadeImpl {
authored: Some(authored),
dht: Some(dht),
cache: Some(cache),
private_data,
scratch,
network: Some(network),
duration_metric: create_cascade_duration_metric(),
}
}
pub fn from_workspace_stores(stores: HostFnStores, author: Option<Arc<AgentPubKey>>) -> Self {
let HostFnStores {
authored,
dht,
cache,
scratch,
} = stores;
Self {
authored: Some(authored),
dht: Some(dht),
cache: Some(cache),
scratch,
network: None,
private_data: author,
duration_metric: create_cascade_duration_metric(),
}
}
pub fn cache(&self) -> Option<&DbWrite<DbKindCache>> {
self.cache.as_ref()
}
#[allow(clippy::result_large_err)] fn insert_rendered_op(txn: &mut Txn<DbKindCache>, op: &RenderedOp) -> CascadeResult<()> {
let RenderedOp {
op_light,
op_hash,
action,
validation_status,
} = op;
let op_order = OpOrder::new(op_light.get_type(), action.action().timestamp());
let timestamp = action.action().timestamp();
insert_action(txn, action)?;
insert_op_lite(
txn,
op_light,
op_hash,
&op_order,
×tamp,
0,
todo_no_cache_transfer_data(),
)?;
if let Some(status) = validation_status {
set_validation_status(txn, op_hash, *status)?;
}
set_when_integrated(txn, op_hash, Timestamp::now())?;
Ok(())
}
#[allow(clippy::result_large_err)] fn insert_rendered_ops(txn: &mut Txn<DbKindCache>, ops: &RenderedOps) -> CascadeResult<()> {
let RenderedOps {
ops,
entry,
warrant,
} = ops;
if let Some(warrant) = warrant {
let op = DhtOpHashed::from_content_sync(warrant.clone());
insert_op_cache(txn, &op)?;
}
if let Some(entry) = entry {
insert_entry(txn, entry.as_hash(), entry.as_content())?;
}
for op in ops {
Self::insert_rendered_op(txn, op)?;
}
Ok(())
}
#[allow(clippy::result_large_err)] fn insert_activity(
txn: &mut Txn<DbKindCache>,
ops: Vec<RegisterAgentActivity>,
) -> CascadeResult<()> {
for op in ops {
let RegisterAgentActivity {
action:
SignedHashed {
hashed: HoloHashed { content, .. },
signature,
},
..
} = op;
let op =
DhtOpHashed::from_content_sync(ChainOp::RegisterAgentActivity(signature, content));
insert_op_cache(txn, &op)?;
set_when_integrated(txn, op.as_hash(), Timestamp::now())?;
}
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
async fn merge_ops_into_cache(&self, responses: Vec<WireOps>) -> CascadeResult<()> {
let cache = some_or_return!(self.cache.as_ref());
cache
.write_async(|txn| {
for response in responses {
let ops = response.render()?;
Self::insert_rendered_ops(txn, &ops)?;
}
CascadeResult::Ok(())
})
.await?;
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
async fn merge_link_ops_into_cache(
&self,
responses: Vec<WireLinkOps>,
key: WireLinkKey,
) -> CascadeResult<()> {
let cache = some_or_return!(self.cache.as_ref());
cache
.write_async(move |txn| {
for response in responses {
let ops = response.render(&key)?;
Self::insert_rendered_ops(txn, &ops)?;
}
CascadeResult::Ok(())
})
.await?;
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip_all))]
async fn add_activity_into_cache(
&self,
responses: Vec<MustGetAgentActivityResponse>,
) -> CascadeResult<MustGetAgentActivityResponse> {
let response = if responses
.iter()
.zip(responses.iter().skip(1))
.all(|(a, b)| a == b)
{
responses.into_iter().next()
} else {
tracing::info!(
"Got different must_get_agent_activity responses from different authorities"
);
responses
.iter()
.find(|a| matches!(a, MustGetAgentActivityResponse::Activity { .. }))
.cloned()
};
let cache = some_or_return!(
self.cache.as_ref(),
response.unwrap_or(MustGetAgentActivityResponse::IncompleteChain)
);
match response {
Some(MustGetAgentActivityResponse::Activity { activity, warrants }) => {
cache
.write_async({
let activity = activity.clone();
let warrants = warrants.clone();
move |txn| {
Self::insert_activity(txn, activity)?;
for warrant in warrants {
let op = DhtOpHashed::from_content_sync(warrant);
insert_op_cache(txn, &op)?;
}
CascadeResult::Ok(())
}
})
.await?;
Ok(MustGetAgentActivityResponse::Activity { activity, warrants })
}
Some(response) => Ok(response),
None => Ok(MustGetAgentActivityResponse::IncompleteChain),
}
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn fetch_record(
&self,
hash: AnyDhtHash,
_options: NetworkGetOptions,
) -> CascadeResult<()> {
let network = some_or_return!(self.network.as_ref());
let results = match network
.get(hash)
.instrument(debug_span!("fetch_record::network_get"))
.await
{
Ok(ops) => ops,
Err(e @ HolochainP2pError::NoPeersForLocation(_, _)) => {
tracing::info!(?e, "No peers to fetch record from");
vec![]
}
Err(e) => return Err(e.into()),
};
self.merge_ops_into_cache(results).await?;
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
async fn fetch_links(
&self,
link_key: WireLinkKey,
options: GetLinksOptions,
) -> CascadeResult<()> {
let network = some_or_return!(self.network.as_ref());
let results = match network.get_links(link_key.clone(), options).await {
Ok(link_ops) => link_ops,
Err(e @ HolochainP2pError::NoPeersForLocation(_, _)) => {
tracing::debug!(?e, "No peers to fetch links from");
vec![]
}
Err(e) => return Err(e.into()),
};
self.merge_link_ops_into_cache(results, link_key.clone())
.await?;
Ok(())
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
async fn fetch_agent_activity(
&self,
agent: AgentPubKey,
query: ChainQueryFilter,
options: GetActivityOptions,
) -> CascadeResult<Vec<AgentActivityResponse>> {
let network = some_or_return!(self.network.as_ref(), Vec::with_capacity(0));
let results = match network.get_agent_activity(agent, query, options).await {
Ok(response) => response,
Err(e @ HolochainP2pError::NoPeersForLocation(_, _)) => {
tracing::debug!(?e, "No peers to fetch agent activity from");
vec![]
}
Err(e) => return Err(e.into()),
};
Ok(results)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self)))]
async fn fetch_must_get_agent_activity(
&self,
author: AgentPubKey,
filter: holochain_zome_types::chain::ChainFilter,
) -> CascadeResult<MustGetAgentActivityResponse> {
let network = some_or_return!(
self.network.as_ref(),
MustGetAgentActivityResponse::IncompleteChain
);
let results = match network.must_get_agent_activity(author, filter).await {
Ok(response) => response,
Err(e @ HolochainP2pError::NoPeersForLocation(_, _)) => {
tracing::debug!(?e, "No peers to fetch agent activity from");
vec![]
}
Err(e) => return Err(e.into()),
};
self.add_activity_into_cache(results).await
}
async fn get_txn_guards(&self) -> CascadeResult<Vec<PTxnGuard>> {
let mut conns: Vec<_> = Vec::with_capacity(3);
if let Some(cache) = &self.cache {
conns.push(cache.get_read_txn().await?);
}
if let Some(dht) = &self.dht {
conns.push(dht.get_read_txn().await?);
}
if let Some(authored) = &self.authored {
conns.push(authored.get_read_txn().await?);
}
Ok(conns)
}
async fn cascading<Q>(&self, query: Q) -> CascadeResult<Q::Output>
where
Q: Query<Item = Judged<SignedActionHashed>> + Send + 'static,
<Q as Query>::Output: Send + 'static,
{
let start = Instant::now();
let mut txn_guards = self.get_txn_guards().await?;
let scratch = self.scratch.clone();
let results = tokio::task::spawn_blocking(move || {
let mut txns = Vec::with_capacity(txn_guards.len());
for conn in &mut txn_guards {
let txn = conn.transaction()?;
txns.push(txn);
}
let txns_ref: Vec<_> = txns.iter().collect();
let results = match scratch {
Some(scratch) => scratch
.apply_and_then(|scratch| query.run(DbScratch::new(&txns_ref, scratch)))?,
None => query.run(Txns::from(&txns_ref[..]))?,
};
CascadeResult::Ok(results)
})
.await??;
self.duration_metric
.record(start.elapsed().as_secs_f64(), &[]);
Ok(results)
}
async fn find_map<F, T>(&self, mut f: F) -> CascadeResult<Option<T>>
where
T: Send + 'static,
F: FnMut(&dyn Store) -> CascadeResult<Option<T>> + Send + Clone + 'static,
{
if let Some(cache) = self.cache.clone() {
let r = cache
.read_async({
let mut f = f.clone();
move |raw_txn| f(&CascadeTxnWrapper::from(raw_txn))
})
.await?;
if r.is_some() {
return Ok(r);
}
}
if let Some(dht) = self.dht.clone() {
let r = dht
.read_async({
let mut f = f.clone();
move |raw_txn| f(&CascadeTxnWrapper::from(raw_txn))
})
.await?;
if r.is_some() {
return Ok(r);
}
}
if let Some(authored) = self.authored.clone() {
let r = authored
.read_async({
let mut f = f.clone();
move |raw_txn| f(&CascadeTxnWrapper::from(raw_txn))
})
.await?;
if r.is_some() {
return Ok(r);
}
}
if let Some(scratch) = &self.scratch {
let r = scratch.apply_and_then(|scratch| f(scratch))?;
if r.is_some() {
return Ok(r);
}
}
Ok(None)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn get_entry_details(
&self,
entry_hash: EntryHash,
options: GetOptions,
) -> CascadeResult<Option<EntryDetails>> {
let query: GetEntryDetailsQuery = self.construct_query_with_data_access(entry_hash.clone());
self.get_local_first_with_query(query, entry_hash.into(), options)
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn get_record_details(
&self,
action_hash: ActionHash,
options: GetOptions,
) -> CascadeResult<Option<RecordDetails>> {
let query: GetRecordDetailsQuery =
self.construct_query_with_data_access(action_hash.clone());
self.get_local_first_with_query(query, action_hash.into(), options)
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn dht_get_action(
&self,
action_hash: ActionHash,
options: GetOptions,
) -> CascadeResult<Option<Record>> {
let query: GetLiveRecordQuery = self.construct_query_with_data_access(action_hash.clone());
self.get_local_first_with_query(query, action_hash.into(), options)
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn dht_get_entry(
&self,
entry_hash: EntryHash,
options: GetOptions,
) -> CascadeResult<Option<Record>> {
let query: GetLiveEntryQuery = self.construct_query_with_data_access(entry_hash.clone());
self.get_local_first_with_query(query, entry_hash.into(), options)
.await
}
async fn get_local_first_with_query<Q, O>(
&self,
query: Q,
get_target: AnyDhtHash,
options: GetOptions,
) -> CascadeResult<Q::Output>
where
Q: Query<Item = Judged<SignedActionHashed>, Output = Option<O>> + Send + 'static,
O: Send + 'static,
{
if let Some(record) = self.cascading(query.clone()).await? {
return Ok(Some(record));
}
if options.strategy == GetStrategy::Network {
let authoring = self.am_i_authoring(&get_target)?;
let authority = self.am_i_an_authority(get_target.clone().into()).await?;
if !(authoring || authority) {
match self.fetch_record(get_target, options.into()).await {
Ok(_) => (),
Err(CascadeError::NetworkError(
e @ HolochainP2pError::NoPeersForLocation(_, _),
)) => {
tracing::debug!(?e, "No peers to fetch record from");
}
Err(e) => {
return Err(e);
}
}
}
self.cascading(query).await
} else {
Ok(None)
}
}
pub async fn get_concurrent<I: IntoIterator<Item = AnyDhtHash>>(
&self,
hashes: I,
options: GetOptions,
) -> CascadeResult<Vec<Option<Record>>> {
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
let iter = hashes.into_iter().map({
|hash| {
let options = options.clone();
let cascade = self.clone();
async move { cascade.dht_get(hash, options).await }
}
});
futures::stream::iter(iter)
.buffer_unordered(10)
.try_collect()
.await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self)))]
pub async fn dht_get(
&self,
hash: AnyDhtHash,
options: GetOptions,
) -> CascadeResult<Option<Record>> {
match hash.into_primitive() {
AnyDhtHashPrimitive::Entry(hash) => self.dht_get_entry(hash, options).await,
AnyDhtHashPrimitive::Action(hash) => self.dht_get_action(hash, options).await,
}
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self)))]
pub async fn get_details(
&self,
hash: AnyDhtHash,
options: GetOptions,
) -> CascadeResult<Option<Details>> {
match hash.into_primitive() {
AnyDhtHashPrimitive::Entry(hash) => Ok(self
.get_entry_details(hash, options)
.await?
.map(Details::Entry)),
AnyDhtHashPrimitive::Action(hash) => Ok(self
.get_record_details(hash, options)
.await?
.map(Details::Record)),
}
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, options)))]
pub async fn dht_get_links(
&self,
key: WireLinkKey,
options: GetLinksOptions,
) -> CascadeResult<Vec<Link>> {
if let GetStrategy::Network = options.get_options.strategy {
let authority = self.am_i_an_authority(key.base.clone()).await?;
if !authority {
match self.fetch_links(key.clone(), options).await {
Ok(_) => (),
Err(CascadeError::NetworkError(
e @ HolochainP2pError::NoPeersForLocation(_, _),
)) => {
tracing::debug!(?e, "No peers to fetch links from");
}
Err(e) => {
return Err(e);
}
}
}
}
let query = GetLinksQuery::new(
key.base,
key.type_query,
key.tag,
GetLinksFilter {
after: key.after,
before: key.before,
author: key.author,
},
);
self.cascading(query).await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, key, options)))]
pub async fn get_links_details(
&self,
key: WireLinkKey,
options: GetLinksOptions,
) -> CascadeResult<Vec<(SignedActionHashed, Vec<SignedActionHashed>)>> {
if let GetStrategy::Network = options.get_options.strategy {
let authority = self.am_i_an_authority(key.base.clone()).await?;
if !authority {
match self.fetch_links(key.clone(), options).await {
Ok(_) => (),
Err(CascadeError::NetworkError(
e @ HolochainP2pError::NoPeersForLocation(_, _),
)) => {
tracing::debug!(?e, "No peers to fetch link details from");
}
Err(e) => {
return Err(e);
}
}
}
}
let query = GetLinkDetailsQuery::new(key.base, key.type_query, key.tag);
self.cascading(query).await
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, query)))]
pub async fn dht_count_links(&self, query: WireLinkQuery) -> CascadeResult<usize> {
let mut links = HashSet::<ActionHash>::new();
if !self.am_i_an_authority(query.base.clone()).await? {
if let Some(network) = &self.network {
match network.count_links(query.clone()).await {
Ok(actions) => {
links.extend(actions.create_link_actions());
}
Err(e @ HolochainP2pError::NoPeersForLocation(_, _)) => {
tracing::debug!(?e, "No peers to fetch link count from");
}
Err(e) => {
return Err(e.into());
}
}
}
}
let get_links_query = GetLinksQuery::new(
query.base.clone(),
query.link_type.clone(),
query.tag_prefix.clone(),
query.into(),
);
links.extend(
self.cascading(get_links_query)
.await?
.into_iter()
.map(|l| l.create_link_hash),
);
Ok(links.len())
}
pub async fn must_get_agent_activity(
&self,
author: AgentPubKey,
filter: ChainFilter,
) -> CascadeResult<MustGetAgentActivityResponse> {
let mut txn_guards = self.get_txn_guards().await?;
let scratch = self.scratch.clone();
let results = tokio::task::spawn_blocking({
let author = author.clone();
let filter = filter.clone();
move || {
let mut results = Vec::with_capacity(txn_guards.len() + 1);
for txn_guard in &mut txn_guards {
let txn = txn_guard.transaction()?;
let r = match &scratch {
Some(scratch) => {
scratch.apply_and_then(|scratch| {
authority::get_agent_activity_query::must_get_agent_activity::get_bounded_activity(&txn, Some(scratch), &author, filter.clone())
})?
}
None => authority::get_agent_activity_query::must_get_agent_activity::get_bounded_activity(&txn, None, &author, filter.clone())?
};
results.push(r);
}
CascadeResult::Ok(results)
}
})
.await??;
let merged_response =
holochain_types::chain::merge_bounded_agent_activity_responses(results);
let result =
authority::get_agent_activity_query::must_get_agent_activity::filter_then_check(
merged_response,
);
if matches!(result, MustGetAgentActivityResponse::Activity { .. }) {
return Ok(result);
}
let i_am_authority = self.am_i_an_authority(author.clone().into()).await?;
if i_am_authority {
Ok(MustGetAgentActivityResponse::IncompleteChain)
} else {
Ok(self
.fetch_must_get_agent_activity(author.clone(), filter)
.await?)
}
}
#[cfg_attr(
feature = "instrument",
tracing::instrument(skip(self, agent, query, options))
)]
pub async fn get_agent_activity(
&self,
agent: AgentPubKey,
query: ChainQueryFilter,
options: GetActivityOptions,
) -> CascadeResult<AgentActivityResponse> {
let status_only = !(options.include_valid_activity || options.include_rejected_activity);
let authority = self.am_i_an_authority(agent.clone().into()).await?;
let merged_response = if authority && options.get_options.strategy == GetStrategy::Local {
match self.dht.clone() {
Some(vault) => {
authority::handle_get_agent_activity(
vault,
agent.clone(),
query.clone(),
(&options).into(),
)
.await?
}
None => {
info!("Unable to get agent activity because this cascade does not have DHT access");
agent_activity::merge_activities(
agent.clone(),
&options,
Vec::with_capacity(0),
)?
}
}
} else {
let results = self
.fetch_agent_activity(agent.clone(), query.clone(), options.clone())
.await?;
let merged_response: AgentActivityResponse =
agent_activity::merge_activities(agent.clone(), &options, results)?;
merged_response
};
if let ChainStatus::Empty = &merged_response.status {
return Ok(AgentActivityResponse::from_empty(merged_response));
}
if status_only {
return Ok(AgentActivityResponse::status_only(merged_response));
}
let AgentActivityResponse {
agent,
mut valid_activity,
mut rejected_activity,
status,
highest_observed,
warrants,
} = merged_response;
if options.include_full_records && query.include_entries {
tracing::debug!("Trying to fill missing entries for agent activity");
valid_activity = self
.fill_missing_chain_item_entries(valid_activity, options.get_options.clone())
.await?;
rejected_activity = self
.fill_missing_chain_item_entries(rejected_activity, options.get_options)
.await?;
}
let r = AgentActivityResponse {
agent,
valid_activity,
rejected_activity,
status,
highest_observed,
warrants,
};
Ok(r)
}
async fn fill_missing_chain_item_entries(
&self,
mut chain_items: ChainItems,
get_options: GetOptions,
) -> CascadeResult<ChainItems> {
let missing_entry_hashes = match &chain_items {
ChainItems::Full(records) => records
.iter()
.filter_map(|r| match r.entry {
RecordEntry::NotStored => r.action().entry_hash().map(|h| h.clone().into()),
_ => None,
})
.collect(),
_ => Vec::with_capacity(0),
};
if !missing_entry_hashes.is_empty() {
trace!(
"There are {} missing entries to fetch",
missing_entry_hashes.len()
);
let maybe_provided_entry_records = self
.get_concurrent(missing_entry_hashes, get_options)
.await?;
trace!("Got {:?} entries", maybe_provided_entry_records.len());
let entry_lookup = maybe_provided_entry_records
.iter()
.filter_map(|r| match r {
Some(r) => r
.signed_action()
.action()
.entry_hash()
.map(|entry_hash| (entry_hash, &r.entry)),
None => None,
})
.collect::<HashMap<_, _>>();
match &mut chain_items {
ChainItems::Full(records) => {
for record in records.iter_mut() {
if let RecordEntry::NotStored = record.entry {
if let Some(entry_hash) = record.action().entry_hash() {
if let Some(entry) = entry_lookup.get(entry_hash) {
record.entry = (*entry).clone();
}
}
}
}
}
_ => {
unreachable!()
}
}
}
Ok(chain_items)
}
#[allow(clippy::result_large_err)] fn am_i_authoring(&self, hash: &AnyDhtHash) -> CascadeResult<bool> {
let scratch = some_or_return!(self.scratch.as_ref(), false);
Ok(scratch.apply_and_then(|scratch| scratch.contains_hash(hash))?)
}
async fn am_i_an_authority(&self, hash: OpBasis) -> CascadeResult<bool> {
let network = some_or_return!(self.network.as_ref(), false);
Ok(network.authority_for_hash(hash).await?)
}
fn construct_query_with_data_access<H, Q: PrivateDataQuery<Hash = H>>(&self, hash: H) -> Q {
match self.private_data.clone() {
Some(author) => Q::with_private_data_access(hash, author),
None => Q::without_private_data_access(hash),
}
}
}
#[async_trait::async_trait]
#[cfg_attr(feature = "test_utils", mockall::automock)]
pub trait Cascade {
async fn retrieve_entry(
&self,
hash: EntryHash,
mut options: NetworkGetOptions,
) -> CascadeResult<Option<(EntryHashed, CascadeSource)>>;
async fn retrieve_action(
&self,
hash: ActionHash,
mut options: NetworkGetOptions,
) -> CascadeResult<Option<(SignedActionHashed, CascadeSource)>>;
async fn retrieve(
&self,
hash: AnyDhtHash,
mut options: NetworkGetOptions,
) -> CascadeResult<Option<(Record, CascadeSource)>>;
}
#[async_trait::async_trait]
impl Cascade for CascadeImpl {
async fn retrieve_entry(
&self,
hash: EntryHash,
options: NetworkGetOptions,
) -> CascadeResult<Option<(EntryHashed, CascadeSource)>> {
let private_data = self.private_data.clone();
let result = self
.find_map({
let hash = hash.clone();
move |store| {
Ok(store.get_public_or_authored_entry(
&hash,
private_data.as_ref().map(|a| a.as_ref()),
)?)
}
})
.await?;
if result.is_some() {
return Ok(result.map(|e| (EntryHashed::from_content_sync(e), CascadeSource::Local)));
}
self.fetch_record(hash.clone().into(), options).await?;
let private_data = self.private_data.clone();
let result = self
.find_map({
let hash = hash.clone();
move |store| {
Ok(store.get_public_or_authored_entry(
&hash,
private_data.as_ref().map(|a| a.as_ref()),
)?)
}
})
.await?;
Ok(result.map(|e| (EntryHashed::from_content_sync(e), CascadeSource::Network)))
}
async fn retrieve_action(
&self,
hash: ActionHash,
options: NetworkGetOptions,
) -> CascadeResult<Option<(SignedActionHashed, CascadeSource)>> {
let result = self
.find_map({
let hash = hash.clone();
move |store| Ok(store.get_action(&hash)?)
})
.await?;
if result.is_some() {
return Ok(result.map(|a| (a, CascadeSource::Local)));
}
self.fetch_record(hash.clone().into(), options).await?;
let result = self
.find_map(move |store| {
Ok(store
.get_action(&hash)?
.map(|a| (a, CascadeSource::Network)))
})
.await?;
Ok(result)
}
async fn retrieve(
&self,
hash: AnyDhtHash,
options: NetworkGetOptions,
) -> CascadeResult<Option<(Record, CascadeSource)>> {
let private_data = self.private_data.clone();
let result = self
.find_map({
let hash = hash.clone();
move |store| {
Ok(store.get_public_or_authored_record(
&hash,
private_data.as_ref().map(|a| a.as_ref()),
)?)
}
})
.await?;
if result.is_some() {
return Ok(result.map(|r| (r, CascadeSource::Local)));
}
self.fetch_record(hash.clone(), options).await?;
let private_data = self.private_data.clone();
let result = self
.find_map(move |store| {
Ok(store.get_public_or_authored_record(
&hash,
private_data.as_ref().map(|a| a.as_ref()),
)?)
})
.await?;
Ok(result.map(|r| (r, CascadeSource::Network)))
}
}
#[cfg(feature = "test_utils")]
impl MockCascade {
pub fn with_records(records: Vec<Record>) -> Self {
let mut cascade = Self::default();
let map: HashMap<AnyDhtHash, Record> = records
.into_iter()
.flat_map(|r| {
let mut items = vec![(r.action_address().clone().into(), r.clone())];
if let Some(eh) = r.action().entry_hash() {
items.push((eh.clone().into(), r))
}
items
})
.collect();
let map0 = Arc::new(parking_lot::Mutex::new(map));
let map = map0.clone();
cascade.expect_retrieve().returning(move |hash, _| {
let m = map.lock();
let result = m.get(&hash).map(|r| (r.clone(), CascadeSource::Local));
Box::pin(async move { Ok(result) })
});
let map = map0.clone();
cascade.expect_retrieve_action().returning(move |hash, _| {
let m = map.lock();
let result = m
.get(&hash.into())
.map(|r| (r.signed_action().clone(), CascadeSource::Local));
Box::pin(async move { Ok(result) })
});
let map = map0;
cascade.expect_retrieve_entry().returning(move |hash, _| {
let m = map.lock();
let result = m.get(&hash.into()).map(|r| {
(
EntryHashed::from_content_sync(r.entry().as_option().unwrap().clone()),
CascadeSource::Local,
)
});
Box::pin(async move { Ok(result) })
});
cascade
}
}
#[tokio::test]
async fn test_mock_cascade_with_records() {
use ::fixt::fixt;
let records = vec![fixt!(Record), fixt!(Record), fixt!(Record)];
let cascade = MockCascade::with_records(records.clone());
let opts = NetworkGetOptions::default();
let (r0, _) = cascade
.retrieve(records[0].action_address().clone().into(), opts.clone())
.await
.unwrap()
.unwrap();
let (r1, _) = cascade
.retrieve(records[1].action_address().clone().into(), opts.clone())
.await
.unwrap()
.unwrap();
let (r2, _) = cascade
.retrieve(records[2].action_address().clone().into(), opts)
.await
.unwrap()
.unwrap();
assert_eq!(records, vec![r0, r1, r2]);
}