1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
//! Witnessing workflow that is the counterpart to the countersigning workflow.
use super::{error::WorkflowResult, incoming_dht_ops_workflow::incoming_dht_ops_workflow};
use crate::conductor::space::Space;
use crate::core::queue_consumer::{TriggerSender, WorkComplete};
use crate::core::ribosome::weigh_placeholder;
use crate::core::share::Share;
use holo_hash::{ActionHash, AgentPubKey, DhtOpHash, EntryHash};
use holochain_p2p::event::CountersigningSessionNegotiationMessage;
use holochain_p2p::DynHolochainP2pDna;
use holochain_state::prelude::*;
use std::collections::HashMap;
/// A cheaply cloneable, thread-safe and in-memory store for
/// active countersigning sessions.
#[derive(Clone, Default)]
pub struct WitnessingWorkspace {
inner: Share<WitnessingWorkspaceInner>,
}
/// Pending countersigning sessions.
#[derive(Default)]
pub struct WitnessingWorkspaceInner {
pending: HashMap<EntryHash, Session>,
}
#[derive(Default)]
struct Session {
/// Map of action hash for each signers action to the [`DhtOp`] and other required actions for
/// this session to be considered complete.
map: HashMap<ActionHash, (DhtOpHash, ChainOp, Vec<ActionHash>)>,
/// When this session expires.
///
/// If this is none the session is empty.
expires: Option<Timestamp>,
}
/// Witnessing workflow that is the counterpart to the countersigning workflow.
///
/// This workflow is run by witnesses to countersigning sessions who are responsible for gathering
/// signatures during sessions. The workflow checks for complete sessions and pushes the complete
/// ops to validation then messages the session participants with the complete set of signatures
/// for the session.
pub(crate) async fn witnessing_workflow(
space: Space,
network: DynHolochainP2pDna,
sys_validation_trigger: TriggerSender,
) -> WorkflowResult<WorkComplete> {
// Get any complete sessions.
let complete_sessions = space.witnessing_workspace.get_complete_sessions();
let mut notify_agents = Vec::with_capacity(complete_sessions.len());
// For each complete session send the ops to validation.
for (agents, ops, actions) in complete_sessions {
let non_enzymatic_ops: Vec<_> = ops
.into_iter()
.filter(|(_hash, dht_op)| dht_op.enzymatic_countersigning_enzyme().is_none())
.collect();
if !non_enzymatic_ops.is_empty() {
incoming_dht_ops_workflow(
space.clone(),
sys_validation_trigger.clone(),
non_enzymatic_ops
.into_iter()
.map(|(_h, o)| o.into())
.collect(),
)
.await?;
}
notify_agents.push((agents, actions));
}
// For each complete session notify the agents of success.
for (agents, actions) in notify_agents {
tracing::debug!("Witnessing ready, notifying agents {:?}", agents);
if let Err(e) = network
.countersigning_session_negotiation(
agents,
CountersigningSessionNegotiationMessage::AuthorityResponse(actions),
)
.await
{
// This could likely fail if a signer is offline, so it's not an error.
tracing::warn!(
"Failed to notify agents: counter signed actions because of {:?}",
e
);
}
}
Ok(WorkComplete::Complete)
}
/// Receive incoming DhtOps for a countersigning session.
///
/// These ops are produced by participants in a countersigning session and sent to us to be checked.
/// This function will store the ops in the workspace and trigger the workflow.
pub(crate) fn receive_incoming_countersigning_ops(
ops: Vec<(DhtOpHash, ChainOp)>,
workspace: &WitnessingWorkspace,
witnessing_workflow_trigger: TriggerSender,
) -> WorkflowResult<()> {
let mut should_trigger = false;
// For each op check it's the right type and extract the
// entry hash, required actions and expires time.
for (hash, op) in ops {
// Must be a store entry op.
if let ChainOp::StoreEntry(_, _, entry) = &op {
// Must be a CounterSign entry type.
if let Entry::CounterSign(session_data, _) = entry {
let entry_hash = EntryHash::with_data_sync(entry);
// Get the required actions for this session.
let weight = weigh_placeholder();
let action_set = session_data.build_action_set(entry_hash, weight)?;
// Get the expires time for this session.
let expires = *session_data.preflight_request().session_times.end();
// Get the entry hash from an action.
// If the actions have different entry hashes they will fail validation.
if let Some(entry_hash) = action_set.first().and_then(|a| a.entry_hash().cloned()) {
// Hash the required actions.
let required_actions: Vec<_> = action_set
.into_iter()
.map(|a| ActionHash::with_data_sync(&a))
.collect();
// Only accept the op if the session is not expired.
if Timestamp::now() < expires {
// Put this op in the pending map.
workspace.put(entry_hash, hash, op, required_actions, expires);
// We have new ops, so we should trigger the workflow.
should_trigger = true;
}
}
} else {
tracing::warn!(?op, "Incoming countersigning op is not a CounterSign entry");
}
} else {
tracing::warn!(?op, "Incoming countersigning op is not a StoreEntry op");
}
}
// Trigger the workflow if we have new ops.
if should_trigger {
witnessing_workflow_trigger.trigger(&"incoming_countersigning");
}
Ok(())
}
type AgentsToNotify = Vec<AgentPubKey>;
type Ops = Vec<(DhtOpHash, ChainOp)>;
type SignedActions = Vec<SignedAction>;
impl WitnessingWorkspace {
/// Create a new empty countersigning workspace.
pub fn new() -> WitnessingWorkspace {
Self {
inner: Share::new(Default::default()),
}
}
/// Put a single signers store entry op in the workspace.
fn put(
&self,
entry_hash: EntryHash,
op_hash: DhtOpHash,
op: ChainOp,
required_actions: Vec<ActionHash>,
expires: Timestamp,
) {
// Hash the action of this op.
let action_hash = ActionHash::with_data_sync(&op.action());
self.inner
.share_mut(|i, _| {
// Get the session at this entry or create an empty one.
let session = i.pending.entry(entry_hash).or_default();
// Insert the op into the session.
session
.map
.insert(action_hash, (op_hash, op, required_actions));
// Set the expires time.
session.expires = Some(expires);
Ok(())
})
// We don't close this share, so we can ignore this error.
.ok();
}
fn get_complete_sessions(&self) -> Vec<(AgentsToNotify, Ops, SignedActions)> {
let now = Timestamp::now();
self.inner
.share_mut(|i, _| {
// Remove any expired sessions.
i.pending.retain(|_, session| {
session.expires.as_ref().map(|e| now < *e).unwrap_or(false)
});
// Get all complete session's entry hashes.
let complete: Vec<_> = i
.pending
.iter()
.filter_map(|(entry_hash, session)| {
// If all session required actions are contained in the map
// then the session is complete.
if session.map.values().all(|(_, _, required_hashes)| {
required_hashes
.iter()
.all(|hash| session.map.contains_key(hash))
}) {
Some(entry_hash.clone())
} else {
None
}
})
.collect();
let mut ret = Vec::with_capacity(complete.len());
// For each complete session remove from the pending map
// and fold into the signed actions to send to the agents
// and the ops to validate.
for hash in complete {
if let Some(session) = i.pending.remove(&hash) {
let map = session.map;
let r = map.into_iter().fold(
(Vec::new(), Vec::new(), Vec::new()),
|(mut agents, mut ops, mut actions), (_, (op_hash, op, _))| {
let action = op.action();
let signature = op.signature().clone();
// Agents to notify.
agents.push(action.author().clone());
// Signed actions to notify them with.
actions.push(SignedAction::new(action, signature));
// Ops to validate.
ops.push((op_hash, op));
(agents, ops, actions)
},
);
ret.push(r);
}
}
Ok(ret)
})
.unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use ::fixt::*;
use holo_hash::fixt::DhtOpHashFixturator;
use holo_hash::fixt::EntryHashFixturator;
/// Test that a session of 5 actions is complete when the expiry time is in the future and all
/// required actions are present.
#[test]
fn gets_complete_sessions() {
let workspace = WitnessingWorkspace::new();
// - Create the ops.
let data = || {
let op_hash = fixt!(DhtOpHash);
let op = ChainOp::RegisterAddLink(
Signature(vec![1; 64].try_into().unwrap()),
fixt!(CreateLink),
);
let action = op.action();
(op_hash, op, action)
};
let entry_hash = fixt!(EntryHash);
let mut op_hashes = Vec::new();
let mut ops = Vec::new();
let mut required_actions = Vec::new();
for _ in 0..5 {
let (op_hash, op, action) = data();
let action_hash = ActionHash::with_data_sync(&action);
op_hashes.push(op_hash);
ops.push(op);
required_actions.push(action_hash);
}
// - Put the ops in the workspace with expiry set to one hour from now.
for (op_h, op) in op_hashes.into_iter().zip(ops.into_iter()) {
let expires = (Timestamp::now() + std::time::Duration::from_secs(60 * 60)).unwrap();
workspace.put(
entry_hash.clone(),
op_h,
op,
required_actions.clone(),
expires,
);
}
// - Get all complete sessions.
let r = workspace.get_complete_sessions();
// - Expect we have one.
assert_eq!(r.len(), 1);
workspace
.inner
.share_mut(|i, _| {
// - Check we have none pending.
assert_eq!(i.pending.len(), 0);
Ok(())
})
.unwrap();
}
/// Test that expired sessions are removed.
#[test]
fn expired_sessions_removed() {
let workspace = WitnessingWorkspace::new();
// - Create an op for a session that has expired in the past.
let op_hash = fixt!(DhtOpHash);
let op = ChainOp::RegisterAddLink(
Signature(vec![1; 64].try_into().unwrap()),
fixt!(CreateLink),
);
let action = op.action();
let entry_hash = fixt!(EntryHash);
let action_hash = ActionHash::with_data_sync(&action);
let expires = (Timestamp::now() - std::time::Duration::from_secs(60 * 60)).unwrap();
// - Add it to the workspace.
workspace.put(entry_hash, op_hash, op, vec![action_hash], expires);
let r = workspace.get_complete_sessions();
// - Expect we have no complete sessions.
assert_eq!(r.len(), 0);
workspace
.inner
.share_mut(|i, _| {
// - Check we have none pending.
assert_eq!(i.pending.len(), 0);
Ok(())
})
.unwrap();
}
}