1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use holo_hash::{AnyDhtHash, DhtOpHash, HasHash};
use holochain_p2p::HolochainP2pDnaT;
use holochain_sqlite::rusqlite::Transaction;
use holochain_types::{
    db_cache::DhtDbQueryCache,
    dht_op::{DhtOp, DhtOpHashed, DhtOpType},
    prelude::DhtOpResult,
};
use holochain_zome_types::{EntryVisibility, SignedAction};

use crate::{prelude::*, query::get_public_op_from_db};

/// Insert any authored ops that have been locally validated
/// into the dht database awaiting integration.
/// This checks if ops are within the storage arc
/// of any local agents.
pub async fn authored_ops_to_dht_db(
    network: &(dyn HolochainP2pDnaT + Send + Sync),
    hashes: Vec<(DhtOpHash, AnyDhtHash)>,
    authored_db: &DbRead<DbKindAuthored>,
    dht_db: &DbWrite<DbKindDht>,
    dht_db_cache: &DhtDbQueryCache,
) -> StateMutationResult<()> {
    // Check if any agents in this space are an authority for these hashes.
    let mut should_hold_hashes = Vec::new();
    for (op_hash, basis) in hashes {
        if network.authority_for_hash(basis).await? {
            should_hold_hashes.push(op_hash);
        }
    }

    // Clone the ops into the dht db for the hashes that should be held.
    authored_ops_to_dht_db_without_check(should_hold_hashes, authored_db, dht_db, dht_db_cache)
        .await
}

/// Insert any authored ops that have been locally validated
/// into the dht database awaiting integration.
pub async fn authored_ops_to_dht_db_without_check(
    hashes: Vec<DhtOpHash>,
    authored_db: &DbRead<DbKindAuthored>,
    dht_db: &DbWrite<DbKindDht>,
    dht_db_cache: &DhtDbQueryCache,
) -> StateMutationResult<()> {
    // Get the ops from the authored database.
    let mut ops = Vec::with_capacity(hashes.len());
    let ops = authored_db
        .async_reader(move |txn| {
            for hash in hashes {
                // This function filters out any private entries from ops
                // or store entry ops with private entries.
                if let Some(op) = get_public_op_from_db(&txn, &hash)? {
                    ops.push(op);
                }
            }
            StateMutationResult::Ok(ops)
        })
        .await?;
    let mut activity = Vec::new();
    let activity = dht_db
        .async_commit(|txn| {
            for op in ops {
                if let Some(op) = insert_locally_validated_op(txn, op)? {
                    activity.push(op);
                }
            }
            StateMutationResult::Ok(activity)
        })
        .await?;
    for op in activity {
        let dependency = get_dependency(op.get_type(), &op.action());

        if matches!(dependency, Dependency::Null) {
            let _ = dht_db_cache
                .set_activity_to_integrated(op.action().author(), op.action().action_seq())
                .await;
        } else {
            dht_db_cache
                .set_activity_ready_to_integrate(op.action().author(), op.action().action_seq())
                .await?;
        }
    }
    Ok(())
}

fn insert_locally_validated_op(
    txn: &mut Transaction,
    op: DhtOpHashed,
) -> StateMutationResult<Option<DhtOpHashed>> {
    // These checks are redundant but cheap and future proof this function
    // against anyone using it with private entries.
    if is_private_store_entry(op.as_content()) {
        return Ok(None);
    }
    let op = filter_private_entry(op)?;
    let hash = op.as_hash();

    let dependency = get_dependency(op.get_type(), &op.action());
    let op_type = op.get_type();

    // Insert the op.
    insert_op(txn, &op)?;
    // Set the status to valid because we authored it.
    set_validation_status(txn, hash, holochain_zome_types::ValidationStatus::Valid)?;

    // If this is a `RegisterAgentActivity` then we need to return it to the dht db cache.
    // Set the stage to awaiting integration.
    if let Dependency::Null = dependency {
        // This set the validation stage to pending which is correct when
        // it's integrated.
        set_validation_stage(txn, hash, ValidationLimboStatus::Pending)?;
        set_when_integrated(txn, hash, holochain_zome_types::Timestamp::now())?;
    } else {
        set_validation_stage(txn, hash, ValidationLimboStatus::AwaitingIntegration)?;
    }
    if matches!(op_type, DhtOpType::RegisterAgentActivity) {
        Ok(Some(op))
    } else {
        Ok(None)
    }
}

fn filter_private_entry(op: DhtOpHashed) -> DhtOpResult<DhtOpHashed> {
    let is_private_entry = op.action().entry_type().map_or(false, |et| {
        matches!(et.visibility(), EntryVisibility::Private)
    });

    if is_private_entry && op.entry().is_some() {
        let (op, hash) = op.into_inner();
        let op_type = op.get_type();
        let (signature, action, _) = op.into_inner();
        Ok(DhtOpHashed::with_pre_hashed(
            DhtOp::from_type(op_type, SignedAction(action, signature), None)?,
            hash,
        ))
    } else {
        Ok(op)
    }
}

fn is_private_store_entry(op: &DhtOp) -> bool {
    op.action()
        .entry_type()
        .map_or(false, |et| *et.visibility() == EntryVisibility::Private)
        && op.get_type() == DhtOpType::StoreEntry
}