use super::error::CascadeResult;
use holo_hash::ActionHash;
use holo_hash::AgentPubKey;
use holo_hash::EntryHash;
use holochain_state::dht_store::{DhtStoreRead, GetAgentActivityOptions};
use holochain_state::query::link::GetLinksFilter;
use holochain_types::prelude::*;
use holochain_zome_types::warrant::{ChainIntegrityWarrant, SignedWarrant, WarrantProof};
use std::collections::HashSet;
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_get_entry(store: DhtStoreRead, hash: EntryHash) -> CascadeResult<WireEntryOps> {
let mut rejected = RejectedRecords::default();
let create_rows = store.get_authority_entry_creates(&hash).await?;
let delete_rows = store.get_authority_deletes_for_entry(&hash).await?;
let update_rows = store.get_authority_updates_for_entry(&hash).await?;
let entry_type = create_rows
.iter()
.chain(update_rows.iter())
.find_map(|(sah, _)| sah.action().entry_type().cloned());
let creates = judged_actions(create_rows, &mut rejected);
let deletes = judged_actions(delete_rows, &mut rejected);
let updates = judged_actions(update_rows, &mut rejected);
let entry = match store.retrieve_entry(&hash, None).await? {
Some(entry) => entry_type.map(|entry_type| EntryData { entry, entry_type }),
None => None,
};
let warrants = collect_warrants(&store, &rejected).await?;
Ok(WireEntryOps {
creates,
deletes,
updates,
entry,
warrants,
})
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_get_record(
store: DhtStoreRead,
hash: ActionHash,
) -> CascadeResult<WireRecordOps> {
let mut rejected = RejectedRecords::default();
let mut entry = None;
let action = match store.get_authority_store_record(&hash).await? {
Some((sah, status)) => {
if status == ValidationStatus::Rejected {
rejected.insert(&sah);
}
if let Some(entry_hash) = sah.action().entry_hash().cloned() {
entry = store.retrieve_entry(&entry_hash, None).await?;
}
Some(Judged::new(SignedAction::from(sah), status))
}
None => None,
};
let deletes = judged_actions(
store.get_authority_deletes_for_record(&hash).await?,
&mut rejected,
);
let updates = judged_actions(
store.get_authority_updates_for_record(&hash).await?,
&mut rejected,
);
let warrants = collect_warrants(&store, &rejected).await?;
Ok(WireRecordOps {
action,
deletes,
updates,
entry,
warrants,
})
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_get_agent_activity(
store: DhtStoreRead,
agent: AgentPubKey,
query: ChainQueryFilter,
options: holochain_p2p::event::GetActivityOptions,
) -> CascadeResult<AgentActivityResponse> {
let options = GetAgentActivityOptions {
include_valid_activity: options.include_valid_activity,
include_rejected_activity: options.include_rejected_activity,
include_warrants: options.include_warrants,
include_full_records: options.include_full_records,
};
Ok(store.get_agent_activity(&agent, &query, &options).await?)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_must_get_agent_activity(
store: DhtStoreRead,
author: AgentPubKey,
filter: ChainFilter,
) -> CascadeResult<MustGetAgentActivityResponse> {
Ok(store.must_get_agent_activity(&author, &filter).await?)
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store, _options)))]
pub async fn handle_get_links(
store: DhtStoreRead,
link_key: WireLinkKey,
_options: holochain_p2p::event::GetLinksOptions,
) -> CascadeResult<WireLinkOps> {
let mut rejected = RejectedRecords::default();
let create_rows = store.get_authority_link_creates(&link_key.base).await?;
let create_rows = filter_link_creates(create_rows, &link_key);
let delete_rows = store.get_authority_delete_links(&link_key.base).await?;
let creates = judged_actions(create_rows, &mut rejected);
let deletes = judged_actions(delete_rows, &mut rejected);
let warrants = collect_warrants(&store, &rejected).await?;
Ok(WireLinkOps {
creates,
deletes,
warrants,
})
}
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_get_links_query(
store: DhtStoreRead,
query: WireLinkQuery,
) -> CascadeResult<Vec<Link>> {
let filter = GetLinksFilter {
after: query.after,
before: query.before,
author: query.author.clone(),
};
Ok(store
.get_links(
&query.base,
&query.link_type,
query.tag_prefix.as_ref(),
&filter,
)
.await?)
}
#[derive(Default)]
struct RejectedRecords {
authors: HashSet<AgentPubKey>,
action_hashes: HashSet<ActionHash>,
}
impl RejectedRecords {
fn insert(&mut self, action: &SignedActionHashed) {
self.authors.insert(action.action().author().clone());
self.action_hashes.insert(action.as_hash().clone());
}
fn justifies(&self, warrant: &SignedWarrant) -> bool {
let WarrantProof::ChainIntegrity(w) = &warrant.proof;
match w {
ChainIntegrityWarrant::InvalidChainOp { action, .. } => {
self.action_hashes.contains(&action.0)
}
ChainIntegrityWarrant::ChainFork { chain_author, .. } => {
self.authors.contains(chain_author)
}
}
}
}
fn judged_actions(
rows: Vec<(SignedActionHashed, ValidationStatus)>,
rejected: &mut RejectedRecords,
) -> Vec<Judged<SignedAction>> {
rows.into_iter()
.map(|(sah, status)| {
if status == ValidationStatus::Rejected {
rejected.insert(&sah);
}
Judged::new(SignedAction::from(sah), status)
})
.collect()
}
async fn collect_warrants(
store: &DhtStoreRead,
rejected: &RejectedRecords,
) -> CascadeResult<Vec<SignedWarrant>> {
let mut warrants = Vec::new();
for author in &rejected.authors {
for warrant in store.get_warrants_by_warrantee(author.clone()).await? {
if rejected.justifies(&warrant) {
warrants.push(warrant);
}
}
}
Ok(warrants)
}
fn filter_link_creates(
rows: Vec<(SignedActionHashed, ValidationStatus)>,
key: &WireLinkKey,
) -> Vec<(SignedActionHashed, ValidationStatus)> {
rows.into_iter()
.filter(|(sah, _)| {
let action = sah.action();
let Action::CreateLink(create_link) = action else {
return false;
};
if !key
.type_query
.contains(&create_link.zome_index, &create_link.link_type)
{
return false;
}
if let Some(tag) = &key.tag {
if !create_link.tag.0.starts_with(&tag.0) {
return false;
}
}
if let Some(author) = &key.author {
if action.author() != author {
return false;
}
}
if let Some(before) = key.before {
if action.timestamp() > before {
return false;
}
}
if let Some(after) = key.after {
if action.timestamp() < after {
return false;
}
}
true
})
.collect()
}