pub struct TriggerSender { /* private fields */ }
Expand description

The means of nudging a queue consumer to tell it to look for more work

Implementations§

Create a new channel for waking a consumer

Create a new channel trigger that will also trigger on a loop. The duration takes a range so that the loop can be set to back off from the lowest to the highest duration. If you do not want a back off, set the duration range to the same value like: Duration::from_millis(10)..Duration::from_millis(10) If reset_on_trigger is true, the back off will be reset whenever a trigger is received.

Lazily nudge the consumer task, ignoring the case where the consumer already has a pending trigger signal

Examples found in repository?
src/core/queue_consumer.rs (line 440)
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
    pub fn initialize_workflows(self) {
        self.sys_validation.trigger(&"init");
        self.app_validation.trigger(&"init");
        self.integrate_dht_ops.trigger(&"init");
        self.publish_dht_ops.trigger(&"init");
        self.validation_receipt.trigger(&"init");
    }
}
/// The means of nudging a queue consumer to tell it to look for more work
#[derive(Clone)]
pub struct TriggerSender {
    /// The actual trigger sender.
    trigger: broadcast::Sender<&'static &'static str>,
    /// Reset the back off loop if there is one.
    reset_back_off: Option<Arc<AtomicBool>>,
    /// Pause / resume the back off loop if there is one.
    pause_back_off: Option<Arc<AtomicBool>>,
}

/// The receiving end of a queue trigger channel
pub struct TriggerReceiver {
    /// The actual trigger.
    rx: broadcast::Receiver<&'static &'static str>,
    /// If there is a back off loop, should
    /// the trigger reset the back off.
    reset_on_trigger: bool,
    /// The optional back off loop.
    back_off: Option<BackOff>,
}

/// A loop that can optionally back off, pause and resume.
struct BackOff {
    /// The starting duration for the back off.
    /// This allows resetting the range.
    start: Duration,
    /// The range of duration for the back off.
    range: Range<Duration>,
    /// If we should reset the range on next iteration.
    reset_back_off: Arc<AtomicBool>,
    /// If we should pause the loop on next iteration.
    paused: Arc<AtomicBool>,
}

impl TriggerSender {
    /// Create a new channel for waking a consumer
    pub fn new() -> (TriggerSender, TriggerReceiver) {
        let (tx, rx) = broadcast::channel(1);
        (
            TriggerSender {
                trigger: tx,
                reset_back_off: None,
                pause_back_off: None,
            },
            TriggerReceiver {
                rx,
                back_off: None,
                reset_on_trigger: false,
            },
        )
    }

    /// Create a new channel trigger that will also trigger
    /// on a loop.
    /// The duration takes a range so that the loop  can
    /// be set to back off from the lowest to the highest duration.
    /// If you do not want a back off, set the duration range
    /// to the same value like: `Duration::from_millis(10)..Duration::from_millis(10)`
    /// If reset_on_trigger is true, the back off will be reset whenever a
    /// trigger is received.
    pub fn new_with_loop(
        range: Range<Duration>,
        reset_on_trigger: bool,
    ) -> (TriggerSender, TriggerReceiver) {
        let (tx, rx) = broadcast::channel(1);
        let reset_back_off = Arc::new(AtomicBool::new(false));
        let pause_back_off = Arc::new(AtomicBool::new(false));
        (
            TriggerSender {
                trigger: tx,
                reset_back_off: Some(reset_back_off.clone()),
                pause_back_off: Some(pause_back_off.clone()),
            },
            TriggerReceiver {
                rx,
                reset_on_trigger,
                back_off: Some(BackOff::new(range, reset_back_off, pause_back_off)),
            },
        )
    }

    /// Lazily nudge the consumer task, ignoring the case where the consumer
    /// already has a pending trigger signal
    pub fn trigger(&self, context: &'static &'static str) {
        if self.trigger.send(context).is_err() {
            tracing::warn!(
                "Queue consumer trigger was sent while Cell is shutting down: ignoring."
            );
        };
    }

    /// Reset the back off to the lowest duration.
    /// If no back off is set this is a no-op.
    pub fn reset_back_off(&self) {
        if let Some(tx) = &self.reset_back_off {
            tx.store(true, Ordering::Relaxed);
        }
    }

    /// Pause the trigger loop if there is one.
    pub fn pause_loop(&self) {
        if let Some(pause) = &self.pause_back_off {
            pause.store(true, Ordering::Relaxed);
        }
    }

    /// Resume the trigger loop now if there is one.
    ///
    /// This will resume the loop even if it is currently
    /// listening (the workflow is not running).
    /// The downside to this call is that if the workflow
    /// is running it will immediately run a second time.
    ///
    /// This call is a no-op if the loop is not paused.
    pub fn resume_loop_now(&self) {
        if let Some(pause) = &self.pause_back_off {
            if pause.fetch_and(false, Ordering::AcqRel) {
                self.trigger(&"resume_loop_now");
            }
        }
    }
More examples
Hide additional examples
src/test_utils.rs (line 963)
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
pub async fn force_publish_dht_ops(
    vault: &DbWrite<DbKindAuthored>,
    publish_trigger: &mut TriggerSender,
) -> DatabaseResult<()> {
    vault
        .async_commit(|txn| {
            DatabaseResult::Ok(txn.execute(
                "UPDATE DhtOp SET last_publish_time = NULL WHERE receipts_complete IS NULL",
                [],
            )?)
        })
        .await?;
    publish_trigger.trigger(&"force_publish_dht_ops");
    Ok(())
}
src/conductor/cell.rs (line 214)
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
    pub async fn genesis<Ribosome>(
        id: CellId,
        conductor_handle: ConductorHandle,
        authored_db: DbWrite<DbKindAuthored>,
        dht_db: DbWrite<DbKindDht>,
        dht_db_cache: DhtDbQueryCache,
        ribosome: Ribosome,
        membrane_proof: Option<MembraneProof>,
        chc: Option<ChcImpl>,
    ) -> CellResult<()>
    where
        Ribosome: RibosomeT + 'static,
    {
        // get the dna
        let dna_file = conductor_handle
            .get_dna_file(id.dna_hash())
            .ok_or_else(|| DnaError::DnaMissing(id.dna_hash().to_owned()))?;

        let conductor_api = CellConductorApi::new(conductor_handle.clone(), id.clone());

        // run genesis
        let workspace = GenesisWorkspace::new(authored_db, dht_db)
            .map_err(ConductorApiError::from)
            .map_err(Box::new)?;

        // exit early if genesis has already run
        if workspace.has_genesis(id.agent_pubkey().clone()).await? {
            return Ok(());
        }

        let args = GenesisWorkflowArgs::new(
            dna_file,
            id.agent_pubkey().clone(),
            membrane_proof,
            ribosome,
            dht_db_cache,
            chc,
        );

        genesis_workflow(workspace, conductor_api, args)
            .await
            .map_err(ConductorApiError::from)
            .map_err(Box::new)?;

        if let Some(trigger) = conductor_handle
            .get_queue_consumer_workflows()
            .integration_trigger(Arc::new(id.dna_hash().clone()))
        {
            trigger.trigger(&"genesis");
        }
        Ok(())
    }
src/core/workflow/countersigning_workflow.rs (line 101)
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
pub(crate) fn incoming_countersigning(
    ops: Vec<(DhtOpHash, DhtOp)>,
    workspace: &CountersigningWorkspace,
    trigger: TriggerSender,
) -> WorkflowResult<()> {
    let mut should_trigger = false;

    // For each op check it's the right type and extract the
    // entry hash, required actions and expires time.
    for (hash, op) in ops {
        // Must be a store entry op.
        if let DhtOp::StoreEntry(_, _, entry) = &op {
            // Must have a counter sign entry type.
            if let Entry::CounterSign(session_data, _) = entry.as_ref() {
                let entry_hash = EntryHash::with_data_sync(&**entry);
                // Get the required actions for this session.
                let weight = weigh_placeholder();
                let action_set = session_data.build_action_set(entry_hash, weight)?;

                // Get the expires time for this session.
                let expires = *session_data.preflight_request().session_times.end();

                // Get the entry hash from an action.
                // If the actions have different entry hashes they will fail validation.
                if let Some(entry_hash) = action_set.first().and_then(|h| h.entry_hash().cloned()) {
                    // Hash the required actions.
                    let required_actions: Vec<_> = action_set
                        .into_iter()
                        .map(|h| ActionHash::with_data_sync(&h))
                        .collect();

                    // Check if already timed out.
                    if holochain_zome_types::Timestamp::now() < expires {
                        // Put this op in the pending map.
                        workspace.put(entry_hash, hash, op, required_actions, expires);
                        // We have new ops so we should trigger the workflow.
                        should_trigger = true;
                    }
                }
            }
        }
    }

    // Trigger the workflow if we have new ops.
    if should_trigger {
        trigger.trigger(&"incoming_countersigning");
    }
    Ok(())
}

/// Countersigning workflow that checks for complete sessions and
/// pushes the complete ops to validation then messages the signers.
pub(crate) async fn countersigning_workflow(
    space: &Space,
    network: &(dyn HolochainP2pDnaT + Send + Sync),
    sys_validation_trigger: &TriggerSender,
) -> WorkflowResult<WorkComplete> {
    // Get any complete sessions.
    let complete_sessions = space.countersigning_workspace.get_complete_sessions();
    let mut notify_agents = Vec::with_capacity(complete_sessions.len());

    // For each complete session send the ops to validation.
    for (agents, ops, actions) in complete_sessions {
        let non_enzymatic_ops: Vec<_> = ops
            .into_iter()
            .filter(|(_hash, dht_op)| dht_op.enzymatic_countersigning_enzyme().is_none())
            .collect();
        if !non_enzymatic_ops.is_empty() {
            incoming_dht_ops_workflow(
                space,
                sys_validation_trigger.clone(),
                non_enzymatic_ops,
                false,
            )
            .await?;
        }
        notify_agents.push((agents, actions));
    }

    // For each complete session notify the agents of success.
    for (agents, actions) in notify_agents {
        if let Err(e) = network
            .countersigning_session_negotiation(
                agents,
                CountersigningSessionNegotiationMessage::AuthorityResponse(actions),
            )
            .await
        {
            // This could likely fail if a signer is offline so it's not really an error.
            tracing::info!(
                "Failed to notify agents: counter signed actions because of {:?}",
                e
            );
        }
    }
    Ok(WorkComplete::Complete)
}

/// An incoming countersigning session success.
pub(crate) async fn countersigning_success(
    space: Space,
    network: &HolochainP2pDna,
    author: AgentPubKey,
    signed_actions: Vec<SignedAction>,
    trigger: QueueTriggers,
    mut signal: SignalBroadcaster,
) -> WorkflowResult<()> {
    let authored_db = space.authored_db;
    let dht_db = space.dht_db;
    let dht_db_cache = space.dht_query_cache;
    let QueueTriggers {
        publish_dht_ops: publish_trigger,
        integrate_dht_ops: integration_trigger,
        ..
    } = trigger;
    // Using iterators is fine in this function as there can only be a maximum of 8 actions.
    let (this_cells_action_hash, entry_hash) = match signed_actions
        .iter()
        .find(|h| *h.0.author() == author)
        .and_then(|sh| {
            sh.0.entry_hash()
                .cloned()
                .map(|eh| (ActionHash::with_data_sync(&sh.0), eh))
        }) {
        Some(h) => h,
        None => return Ok(()),
    };

    // Do a quick check to see if this entry hash matches
    // the current locked session so we don't check signatures
    // unless there is an active session.
    let reader_closure = {
        let entry_hash = entry_hash.clone();
        let this_cells_action_hash = this_cells_action_hash.clone();
        let author = author.clone();
        move |txn: Transaction| {
            if holochain_state::chain_lock::is_chain_locked(&txn, &[], &author)? {
                let transaction: holochain_state::prelude::Txn = (&txn).into();
                if transaction.contains_entry(&entry_hash)? {
                    // If this is a countersigning session we can grab all the ops
                    // for this cells action so we can check if we need to self publish them.
                    let r: Result<_, _> = txn
                        .prepare(
                            "SELECT basis_hash, hash FROM DhtOp WHERE action_hash = :action_hash",
                        )?
                        .query_map(
                            named_params! {
                                ":action_hash": this_cells_action_hash
                            },
                            |row| {
                                let hash: DhtOpHash = row.get("hash")?;
                                let basis: OpBasis = row.get("basis_hash")?;
                                Ok((hash, basis))
                            },
                        )?
                        .collect();
                    return Ok(r?);
                }
            }
            StateMutationResult::Ok(Vec::with_capacity(0))
        }
    };
    let this_cell_actions_op_basis_hashes: Vec<(DhtOpHash, OpBasis)> =
        authored_db.async_reader(reader_closure).await?;

    // If there is no active session then we can short circuit.
    if this_cell_actions_op_basis_hashes.is_empty() {
        return Ok(());
    }

    // Verify signatures of actions.
    let mut i_am_an_author = false;
    for SignedAction(action, signature) in &signed_actions {
        if !action.author().verify_signature(signature, action).await {
            return Ok(());
        }
        if action.author() == &author {
            i_am_an_author = true;
        }
    }
    // Countersigning success is ultimately between authors to agree and publish.
    if !i_am_an_author {
        return Ok(());
    }

    // Hash actions.
    let incoming_actions: Vec<_> = signed_actions
        .iter()
        .map(|SignedAction(h, _)| ActionHash::with_data_sync(h))
        .collect();

    let result = authored_db
        .async_commit({
            let author = author.clone();
            let entry_hash = entry_hash.clone();
            move |txn| {
            if let Some((cs_entry_hash, cs)) = current_countersigning_session(txn, Arc::new(author.clone()))? {
                // Check we have the right session.
                if cs_entry_hash == entry_hash {
                    let weight = weigh_placeholder();
                    let stored_actions = cs.build_action_set(entry_hash, weight)?;
                    if stored_actions.len() == incoming_actions.len() {
                        // Check all stored action hashes match an incoming action hash.
                        if stored_actions.iter().all(|h| {
                            let h = ActionHash::with_data_sync(h);
                            incoming_actions.iter().any(|i| *i == h)
                        }) {
                            // All checks have passed so unlock the chain.
                            mutations::unlock_chain(txn, &author)?;
                            // Update ops to publish.
                            txn.execute("UPDATE DhtOp SET withhold_publish = NULL WHERE action_hash = :action_hash",
                            named_params! {
                                ":action_hash": this_cells_action_hash,
                                }
                            ).map_err(holochain_state::prelude::StateMutationError::from)?;
                            return Ok(true);
                        }
                    }
                }
            }
            SourceChainResult::Ok(false)
        }})
        .await?;

    if result {
        // If all signatures are valid (above) and i signed then i must have
        // validated it previously so i now agree that i authored it.
        authored_ops_to_dht_db_without_check(
            this_cell_actions_op_basis_hashes
                .into_iter()
                .map(|(op_hash, _)| op_hash)
                .collect(),
            &(authored_db.into()),
            &dht_db,
            &dht_db_cache,
        )
        .await?;
        integration_trigger.trigger(&"countersigning_success");
        // Publish other signers agent activity ops to their agent activity authorities.
        for SignedAction(action, signature) in signed_actions {
            if *action.author() == author {
                continue;
            }
            let op = DhtOp::RegisterAgentActivity(signature, action);
            let basis = op.dht_basis();
            if let Err(e) = network.publish_countersign(false, basis, op).await {
                tracing::error!(
                    "Failed to publish to other countersigners agent authorities because of: {:?}",
                    e
                );
            }
        }
        // Signal to the UI.
        signal.send(Signal::System(SystemSignal::SuccessfulCountersigning(
            entry_hash,
        )))?;

        publish_trigger.trigger(&"publish countersigning_success");
    }
    Ok(())
}

Reset the back off to the lowest duration. If no back off is set this is a no-op.

Pause the trigger loop if there is one.

Resume the trigger loop now if there is one.

This will resume the loop even if it is currently listening (the workflow is not running). The downside to this call is that if the workflow is running it will immediately run a second time.

This call is a no-op if the loop is not paused.

Resume the trigger loop if there is one.

This will cause the loop to to resume after the next trigger (or if the workflow is currently in progress). It will not cause the loop to resume immediately. If the loop is currently listening (the workflow is not running) then nothing will happen until the next trigger. See resume_loop_now for a version that will resume immediately.

This call is a no-op if the loop is not paused.

Trait Implementations§

Returns a copy of the value. Read more
Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
TODO: once 1.33.0 is the minimum supported compiler version, remove Any::type_id_compat and use StdAny::type_id instead. https://github.com/rust-lang/rust/issues/27745
The archived version of the pointer metadata for this type.
Converts some archived metadata to the pointer metadata for itself.
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more
Deserializes using the given deserializer

Returns the argument unchanged.

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Attaches the current Context to this type, returning a WithContext wrapper. Read more
Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The alignment of pointer.
The type for initializers.
Initializes a with the given initializer. Read more
Dereferences the given pointer. Read more
Mutably dereferences the given pointer. Read more
Drops the object pointed to by the given pointer. Read more
The type for metadata in pointers and references to Self.
Should always be Self
The inverse inclusion map: attempts to construct self from the equivalent element of its superset. Read more
Checks if self is actually part of its subset T (and can be converted to it).
Use with care! Same as self.to_subset but without any property checks. Always succeeds.
The inclusion map: converts self to the equivalent element of its superset.
The resulting type after obtaining ownership.
Creates owned data from borrowed data, usually by cloning. Read more
Uses borrowed data to replace owned data, usually by cloning. Read more
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.
upcast ref
upcast mut ref
upcast boxed dyn
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more