holochain/core/workflow/
publish_dht_ops_workflow.rs1use super::error::WorkflowResult;
15use crate::core::queue_consumer::TriggerSender;
16use crate::core::queue_consumer::WorkComplete;
17use holo_hash::*;
18use holochain_p2p::DynHolochainP2pDna;
19use holochain_state::prelude::*;
20use std::collections::HashMap;
21use std::time;
22use std::time::Duration;
23use tracing::*;
24
25mod publish_query;
26pub use publish_query::{get_ops_to_publish, num_still_needing_publish};
27
28#[cfg(test)]
29mod unit_tests;
30
31pub const DEFAULT_RECEIPT_BUNDLE_SIZE: u8 = 5;
33
34#[cfg_attr(
35 feature = "instrument",
36 tracing::instrument(skip(db, network, trigger_self, min_publish_interval))
37)]
38pub async fn publish_dht_ops_workflow(
39 db: DbWrite<DbKindAuthored>,
40 network: DynHolochainP2pDna,
41 trigger_self: TriggerSender,
42 agent: AgentPubKey,
43 min_publish_interval: Duration,
44) -> WorkflowResult<WorkComplete> {
45 let mut complete = WorkComplete::Complete;
46 let to_publish =
47 publish_dht_ops_workflow_inner(db.clone().into(), agent.clone(), min_publish_interval)
48 .await?;
49 let to_publish_count: usize = to_publish.values().map(Vec::len).sum();
50
51 if to_publish_count > 0 {
52 info!(?agent, "publishing {to_publish_count} ops");
53 }
54
55 let mut success = Vec::with_capacity(to_publish.len());
57 for (basis, list) in to_publish {
58 let (op_hash_list, op_data_list): (Vec<_>, Vec<_>) = list.into_iter().unzip();
59 match network
60 .publish(
61 basis,
62 agent.clone(),
63 op_hash_list.clone(),
64 None,
65 Some(op_data_list),
66 )
67 .await
68 {
69 Err(e) => {
70 if let holochain_p2p::HolochainP2pError::RoutingDnaError(_) = e {
72 complete = WorkComplete::Incomplete(None);
74 }
75 warn!(failed_to_send_publish = ?e);
76 }
77 Ok(()) => {
78 success.extend(op_hash_list);
79 }
80 }
81 }
82
83 if to_publish_count > 0 {
84 info!(
85 ?agent,
86 "published {}/{} ops",
87 success.len(),
88 to_publish_count
89 );
90 }
91
92 let now = time::SystemTime::now().duration_since(time::UNIX_EPOCH)?;
93 let continue_publish = db
94 .write_async({
95 let agent = agent.clone();
96 move |txn| {
97 for hash in success {
98 set_last_publish_time(txn, &hash, now)?;
99 }
100 WorkflowResult::Ok(publish_query::num_still_needing_publish(txn, agent)? > 0)
101 }
102 })
103 .await?;
104
105 if continue_publish {
107 trigger_self.resume_loop();
108 } else {
109 trigger_self.pause_loop();
110 }
111
112 debug!(?agent, "committed published ops");
113
114 Ok(complete)
117}
118
119pub async fn publish_dht_ops_workflow_inner(
121 db: DbRead<DbKindAuthored>,
122 agent: AgentPubKey,
123 min_publish_interval: Duration,
124) -> WorkflowResult<HashMap<OpBasis, Vec<(DhtOpHash, crate::prelude::DhtOp)>>> {
125 let mut to_publish = HashMap::new();
127
128 for (basis, op_hash, op) in get_ops_to_publish(agent, &db, min_publish_interval).await? {
129 to_publish
132 .entry(basis)
133 .or_insert_with(Vec::new)
134 .push((op_hash, op));
135 }
136
137 Ok(to_publish)
138}