holochain/core/workflow/
publish_dht_ops_workflow.rs

1//! # Publish Dht Op Workflow
2//!
3//! ## Open questions
4//! - [x] Publish add and remove links on private entries, what are the constraints on when to publish
5//!
6//! For now, Publish links on private entries
7// TODO: B-01827 Make story about: later consider adding a flag to make a link private and not publish it.
8//       Even for those private links, we may need to publish them to the author of the private entry
9//       (and we'd have to reference its action  which actually exists on the DHT to make that work,
10//       rather than the entry which does not exist on the DHT).
11//!
12//!
13
14use super::error::WorkflowResult;
15use crate::core::queue_consumer::TriggerSender;
16use crate::core::queue_consumer::WorkComplete;
17use holo_hash::*;
18use holochain_p2p::DynHolochainP2pDna;
19use holochain_state::prelude::*;
20use std::collections::HashMap;
21use std::time;
22use std::time::Duration;
23use tracing::*;
24
25mod publish_query;
26pub use publish_query::{get_ops_to_publish, num_still_needing_publish};
27
28#[cfg(test)]
29mod unit_tests;
30
31/// Default redundancy factor for validation receipts
32pub const DEFAULT_RECEIPT_BUNDLE_SIZE: u8 = 5;
33
34#[cfg_attr(
35    feature = "instrument",
36    tracing::instrument(skip(db, network, trigger_self, min_publish_interval))
37)]
38pub async fn publish_dht_ops_workflow(
39    db: DbWrite<DbKindAuthored>,
40    network: DynHolochainP2pDna,
41    trigger_self: TriggerSender,
42    agent: AgentPubKey,
43    min_publish_interval: Duration,
44) -> WorkflowResult<WorkComplete> {
45    let mut complete = WorkComplete::Complete;
46    let to_publish =
47        publish_dht_ops_workflow_inner(db.clone().into(), agent.clone(), min_publish_interval)
48            .await?;
49    let to_publish_count: usize = to_publish.values().map(Vec::len).sum();
50
51    if to_publish_count > 0 {
52        info!(?agent, "publishing {to_publish_count} ops");
53    }
54
55    // Commit to the network
56    let mut success = Vec::with_capacity(to_publish.len());
57    for (basis, list) in to_publish {
58        let (op_hash_list, op_data_list): (Vec<_>, Vec<_>) = list.into_iter().unzip();
59        match network
60            .publish(
61                basis,
62                agent.clone(),
63                op_hash_list.clone(),
64                None,
65                Some(op_data_list),
66            )
67            .await
68        {
69            Err(e) => {
70                // If we get a routing error it means the space hasn't started yet and we should try publishing again.
71                if let holochain_p2p::HolochainP2pError::RoutingDnaError(_) = e {
72                    // TODO if this doesn't change what is the loop terminate condition?
73                    complete = WorkComplete::Incomplete(None);
74                }
75                warn!(failed_to_send_publish = ?e);
76            }
77            Ok(()) => {
78                success.extend(op_hash_list);
79            }
80        }
81    }
82
83    if to_publish_count > 0 {
84        info!(
85            ?agent,
86            "published {}/{} ops",
87            success.len(),
88            to_publish_count
89        );
90    }
91
92    let now = time::SystemTime::now().duration_since(time::UNIX_EPOCH)?;
93    let continue_publish = db
94        .write_async({
95            let agent = agent.clone();
96            move |txn| {
97                for hash in success {
98                    set_last_publish_time(txn, &hash, now)?;
99                }
100                WorkflowResult::Ok(publish_query::num_still_needing_publish(txn, agent)? > 0)
101            }
102        })
103        .await?;
104
105    // If we have more ops that could be published then continue looping.
106    if continue_publish {
107        trigger_self.resume_loop();
108    } else {
109        trigger_self.pause_loop();
110    }
111
112    debug!(?agent, "committed published ops");
113
114    // --- END OF WORKFLOW, BEGIN FINISHER BOILERPLATE ---
115
116    Ok(complete)
117}
118
119/// Read the authored for ops with receipt count < R
120pub async fn publish_dht_ops_workflow_inner(
121    db: DbRead<DbKindAuthored>,
122    agent: AgentPubKey,
123    min_publish_interval: Duration,
124) -> WorkflowResult<HashMap<OpBasis, Vec<(DhtOpHash, crate::prelude::DhtOp)>>> {
125    // Ops to publish by basis
126    let mut to_publish = HashMap::new();
127
128    for (basis, op_hash, op) in get_ops_to_publish(agent, &db, min_publish_interval).await? {
129        // For every op publish a request
130        // Collect and sort ops by basis
131        to_publish
132            .entry(basis)
133            .or_insert_with(Vec::new)
134            .push((op_hash, op));
135    }
136
137    Ok(to_publish)
138}