holochain_cascade 0.7.0-dev.32

Logic for cascading updates to Holochain state and network interaction
Documentation
//! Functions for the various authorities to handle queries.
//!
//! The get authorities serve **records** — actions plus their entry — each
//! carrying its record-level validation status. A `Rejected` record is always
//! served with a warrant justifying the rejection. The receiver still validates
//! that warrant (and, through it, the warranted op) before acting on it —
//! warrants are never implicitly trusted, because acting on one blocks an agent,
//! which is serious and currently permanent. The pairing instead guarantees a
//! rejected op is never served without the warrant that justifies it, so the
//! receiver is not left waiting for one or the other. All reads go through the
//! `holochain_data`-backed [`DhtStoreRead`]; only locally-validated data is
//! served (enforced by the store's authority reads).
//!
//! Each handler composes several independent [`DhtStoreRead`] queries rather
//! than a single transaction, so a concurrent write (e.g. integration) can make
//! a handler's result a non-atomic mix of two store states. That is acceptable:
//! get is eventually consistent — a requester reconciles responses across
//! authorities and over time, so a momentarily missing or extra delete/update
//! is corrected on a later get.

use super::error::CascadeResult;
use holo_hash::ActionHash;
use holo_hash::AgentPubKey;
use holo_hash::EntryHash;
use holochain_state::dht_store::{DhtStoreRead, GetAgentActivityOptions};
use holochain_state::query::link::GetLinksFilter;
use holochain_types::prelude::*;
use holochain_zome_types::warrant::{ChainIntegrityWarrant, SignedWarrant, WarrantProof};
use std::collections::HashSet;

/// Handler for get_entry query to an Entry authority.
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_get_entry(store: DhtStoreRead, hash: EntryHash) -> CascadeResult<WireEntryOps> {
    let mut rejected = RejectedRecords::default();

    let create_rows = store.get_authority_entry_creates(&hash).await?;
    let delete_rows = store.get_authority_deletes_for_entry(&hash).await?;
    let update_rows = store.get_authority_updates_for_entry(&hash).await?;

    // The entry type is shared across all actions on the entry; take it from
    // whichever create or update action serves it.
    let entry_type = create_rows
        .iter()
        .chain(update_rows.iter())
        .find_map(|(sah, _)| sah.action().entry_type().cloned());

    let creates = judged_actions(create_rows, &mut rejected);
    let deletes = judged_actions(delete_rows, &mut rejected);
    let updates = judged_actions(update_rows, &mut rejected);

    let entry = match store.retrieve_entry(&hash, None).await? {
        Some(entry) => entry_type.map(|entry_type| EntryData { entry, entry_type }),
        None => None,
    };

    let warrants = collect_warrants(&store, &rejected).await?;

    Ok(WireEntryOps {
        creates,
        deletes,
        updates,
        entry,
        warrants,
    })
}

/// Handler for get_record query to a Record authority.
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_get_record(
    store: DhtStoreRead,
    hash: ActionHash,
) -> CascadeResult<WireRecordOps> {
    let mut rejected = RejectedRecords::default();

    let mut entry = None;
    let action = match store.get_authority_store_record(&hash).await? {
        Some((sah, status)) => {
            if status == ValidationStatus::Rejected {
                rejected.insert(&sah);
            }
            if let Some(entry_hash) = sah.action().entry_hash().cloned() {
                entry = store.retrieve_entry(&entry_hash, None).await?;
            }
            Some(Judged::new(SignedAction::from(sah), status))
        }
        None => None,
    };

    let deletes = judged_actions(
        store.get_authority_deletes_for_record(&hash).await?,
        &mut rejected,
    );
    let updates = judged_actions(
        store.get_authority_updates_for_record(&hash).await?,
        &mut rejected,
    );

    let warrants = collect_warrants(&store, &rejected).await?;

    Ok(WireRecordOps {
        action,
        deletes,
        updates,
        entry,
        warrants,
    })
}

/// Handler for get_agent_activity query to an Activity authority.
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_get_agent_activity(
    store: DhtStoreRead,
    agent: AgentPubKey,
    query: ChainQueryFilter,
    options: holochain_p2p::event::GetActivityOptions,
) -> CascadeResult<AgentActivityResponse> {
    let options = GetAgentActivityOptions {
        include_valid_activity: options.include_valid_activity,
        include_rejected_activity: options.include_rejected_activity,
        include_warrants: options.include_warrants,
        include_full_records: options.include_full_records,
    };
    Ok(store.get_agent_activity(&agent, &query, &options).await?)
}

/// Handler for must_get_agent_activity query to an Activity authority.
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_must_get_agent_activity(
    store: DhtStoreRead,
    author: AgentPubKey,
    filter: ChainFilter,
) -> CascadeResult<MustGetAgentActivityResponse> {
    Ok(store.must_get_agent_activity(&author, &filter).await?)
}

/// Handler for get_links query to a Record/Entry authority.
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store, _options)))]
pub async fn handle_get_links(
    store: DhtStoreRead,
    link_key: WireLinkKey,
    _options: holochain_p2p::event::GetLinksOptions,
) -> CascadeResult<WireLinkOps> {
    let mut rejected = RejectedRecords::default();

    let create_rows = store.get_authority_link_creates(&link_key.base).await?;
    let create_rows = filter_link_creates(create_rows, &link_key);
    let delete_rows = store.get_authority_delete_links(&link_key.base).await?;

    let creates = judged_actions(create_rows, &mut rejected);
    let deletes = judged_actions(delete_rows, &mut rejected);

    let warrants = collect_warrants(&store, &rejected).await?;

    Ok(WireLinkOps {
        creates,
        deletes,
        warrants,
    })
}

/// Handler for querying links (returns rendered [`Link`]s, not wire ops).
#[cfg_attr(feature = "instrument", tracing::instrument(skip(store)))]
pub async fn handle_get_links_query(
    store: DhtStoreRead,
    query: WireLinkQuery,
) -> CascadeResult<Vec<Link>> {
    let filter = GetLinksFilter {
        after: query.after,
        before: query.before,
        author: query.author.clone(),
    };
    Ok(store
        .get_links(
            &query.base,
            &query.link_type,
            query.tag_prefix.as_ref(),
            &filter,
        )
        .await?)
}

/// The `Rejected` records in a get-lookup response, tracked so the warrants
/// justifying them can be attached. A warrant justifies a served record when it
/// is an `InvalidChainOp` naming one of `action_hashes`, or a `ChainFork`
/// against one of `authors`.
#[derive(Default)]
struct RejectedRecords {
    authors: HashSet<AgentPubKey>,
    action_hashes: HashSet<ActionHash>,
}

impl RejectedRecords {
    fn insert(&mut self, action: &SignedActionHashed) {
        self.authors.insert(action.action().author().clone());
        self.action_hashes.insert(action.as_hash().clone());
    }

    fn justifies(&self, warrant: &SignedWarrant) -> bool {
        let WarrantProof::ChainIntegrity(w) = &warrant.proof;
        match w {
            ChainIntegrityWarrant::InvalidChainOp { action, .. } => {
                self.action_hashes.contains(&action.0)
            }
            ChainIntegrityWarrant::ChainFork { chain_author, .. } => {
                self.authors.contains(chain_author)
            }
        }
    }
}

/// Convert authority-read rows into wire-ready judged actions, recording each
/// `Rejected` record so the warrant justifying it can be attached.
fn judged_actions(
    rows: Vec<(SignedActionHashed, ValidationStatus)>,
    rejected: &mut RejectedRecords,
) -> Vec<Judged<SignedAction>> {
    rows.into_iter()
        .map(|(sah, status)| {
            if status == ValidationStatus::Rejected {
                rejected.insert(&sah);
            }
            Judged::new(SignedAction::from(sah), status)
        })
        .collect()
}

/// Fetch the warrants justifying the `Rejected` records served. Warrants live
/// at the warrantee's (author's) basis, so they are read per rejected author and
/// then filtered to those that justify a served record — an `InvalidChainOp`
/// naming a served action, or a `ChainFork` against the author. Serving only the
/// relevant warrants keeps a get-lookup response scoped to what it returns;
/// agent-activity serves all of an author's warrants through its own store read.
async fn collect_warrants(
    store: &DhtStoreRead,
    rejected: &RejectedRecords,
) -> CascadeResult<Vec<SignedWarrant>> {
    let mut warrants = Vec::new();
    for author in &rejected.authors {
        for warrant in store.get_warrants_by_warrantee(author.clone()).await? {
            if rejected.justifies(&warrant) {
                warrants.push(warrant);
            }
        }
    }
    Ok(warrants)
}

/// Apply the wire link key's type/tag/author/time filters to create-link rows.
fn filter_link_creates(
    rows: Vec<(SignedActionHashed, ValidationStatus)>,
    key: &WireLinkKey,
) -> Vec<(SignedActionHashed, ValidationStatus)> {
    rows.into_iter()
        .filter(|(sah, _)| {
            let action = sah.action();
            let Action::CreateLink(create_link) = action else {
                return false;
            };
            if !key
                .type_query
                .contains(&create_link.zome_index, &create_link.link_type)
            {
                return false;
            }
            if let Some(tag) = &key.tag {
                if !create_link.tag.0.starts_with(&tag.0) {
                    return false;
                }
            }
            if let Some(author) = &key.author {
                if action.author() != author {
                    return false;
                }
            }
            if let Some(before) = key.before {
                if action.timestamp() > before {
                    return false;
                }
            }
            if let Some(after) = key.after {
                if action.timestamp() < after {
                    return false;
                }
            }
            true
        })
        .collect()
}