Struct holochain::core::queue_consumer::TriggerSender
source · pub struct TriggerSender { /* private fields */ }
Expand description
The means of nudging a queue consumer to tell it to look for more work
Implementations§
source§impl TriggerSender
impl TriggerSender
sourcepub fn new() -> (TriggerSender, TriggerReceiver)
pub fn new() -> (TriggerSender, TriggerReceiver)
Create a new channel for waking a consumer
sourcepub fn new_with_loop(
range: Range<Duration>,
reset_on_trigger: bool
) -> (TriggerSender, TriggerReceiver)
pub fn new_with_loop(
range: Range<Duration>,
reset_on_trigger: bool
) -> (TriggerSender, TriggerReceiver)
Create a new channel trigger that will also trigger
on a loop.
The duration takes a range so that the loop can
be set to back off from the lowest to the highest duration.
If you do not want a back off, set the duration range
to the same value like: Duration::from_millis(10)..Duration::from_millis(10)
If reset_on_trigger is true, the back off will be reset whenever a
trigger is received.
sourcepub fn trigger(&self, context: &'static &'static str)
pub fn trigger(&self, context: &'static &'static str)
Lazily nudge the consumer task, ignoring the case where the consumer already has a pending trigger signal
Examples found in repository?
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
pub fn initialize_workflows(self) {
self.sys_validation.trigger(&"init");
self.app_validation.trigger(&"init");
self.integrate_dht_ops.trigger(&"init");
self.publish_dht_ops.trigger(&"init");
self.validation_receipt.trigger(&"init");
}
}
/// The means of nudging a queue consumer to tell it to look for more work
#[derive(Clone)]
pub struct TriggerSender {
/// The actual trigger sender.
trigger: broadcast::Sender<&'static &'static str>,
/// Reset the back off loop if there is one.
reset_back_off: Option<Arc<AtomicBool>>,
/// Pause / resume the back off loop if there is one.
pause_back_off: Option<Arc<AtomicBool>>,
}
/// The receiving end of a queue trigger channel
pub struct TriggerReceiver {
/// The actual trigger.
rx: broadcast::Receiver<&'static &'static str>,
/// If there is a back off loop, should
/// the trigger reset the back off.
reset_on_trigger: bool,
/// The optional back off loop.
back_off: Option<BackOff>,
}
/// A loop that can optionally back off, pause and resume.
struct BackOff {
/// The starting duration for the back off.
/// This allows resetting the range.
start: Duration,
/// The range of duration for the back off.
range: Range<Duration>,
/// If we should reset the range on next iteration.
reset_back_off: Arc<AtomicBool>,
/// If we should pause the loop on next iteration.
paused: Arc<AtomicBool>,
}
impl TriggerSender {
/// Create a new channel for waking a consumer
pub fn new() -> (TriggerSender, TriggerReceiver) {
let (tx, rx) = broadcast::channel(1);
(
TriggerSender {
trigger: tx,
reset_back_off: None,
pause_back_off: None,
},
TriggerReceiver {
rx,
back_off: None,
reset_on_trigger: false,
},
)
}
/// Create a new channel trigger that will also trigger
/// on a loop.
/// The duration takes a range so that the loop can
/// be set to back off from the lowest to the highest duration.
/// If you do not want a back off, set the duration range
/// to the same value like: `Duration::from_millis(10)..Duration::from_millis(10)`
/// If reset_on_trigger is true, the back off will be reset whenever a
/// trigger is received.
pub fn new_with_loop(
range: Range<Duration>,
reset_on_trigger: bool,
) -> (TriggerSender, TriggerReceiver) {
let (tx, rx) = broadcast::channel(1);
let reset_back_off = Arc::new(AtomicBool::new(false));
let pause_back_off = Arc::new(AtomicBool::new(false));
(
TriggerSender {
trigger: tx,
reset_back_off: Some(reset_back_off.clone()),
pause_back_off: Some(pause_back_off.clone()),
},
TriggerReceiver {
rx,
reset_on_trigger,
back_off: Some(BackOff::new(range, reset_back_off, pause_back_off)),
},
)
}
/// Lazily nudge the consumer task, ignoring the case where the consumer
/// already has a pending trigger signal
pub fn trigger(&self, context: &'static &'static str) {
if self.trigger.send(context).is_err() {
tracing::warn!(
"Queue consumer trigger was sent while Cell is shutting down: ignoring."
);
};
}
/// Reset the back off to the lowest duration.
/// If no back off is set this is a no-op.
pub fn reset_back_off(&self) {
if let Some(tx) = &self.reset_back_off {
tx.store(true, Ordering::Relaxed);
}
}
/// Pause the trigger loop if there is one.
pub fn pause_loop(&self) {
if let Some(pause) = &self.pause_back_off {
pause.store(true, Ordering::Relaxed);
}
}
/// Resume the trigger loop now if there is one.
///
/// This will resume the loop even if it is currently
/// listening (the workflow is not running).
/// The downside to this call is that if the workflow
/// is running it will immediately run a second time.
///
/// This call is a no-op if the loop is not paused.
pub fn resume_loop_now(&self) {
if let Some(pause) = &self.pause_back_off {
if pause.fetch_and(false, Ordering::AcqRel) {
self.trigger(&"resume_loop_now");
}
}
}
More examples
951 952 953 954 955 956 957 958 959 960 961 962 963 964 965
pub async fn force_publish_dht_ops(
vault: &DbWrite<DbKindAuthored>,
publish_trigger: &mut TriggerSender,
) -> DatabaseResult<()> {
vault
.async_commit(|txn| {
DatabaseResult::Ok(txn.execute(
"UPDATE DhtOp SET last_publish_time = NULL WHERE receipts_complete IS NULL",
[],
)?)
})
.await?;
publish_trigger.trigger(&"force_publish_dht_ops");
Ok(())
}
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
pub async fn genesis<Ribosome>(
id: CellId,
conductor_handle: ConductorHandle,
authored_db: DbWrite<DbKindAuthored>,
dht_db: DbWrite<DbKindDht>,
dht_db_cache: DhtDbQueryCache,
ribosome: Ribosome,
membrane_proof: Option<MembraneProof>,
chc: Option<ChcImpl>,
) -> CellResult<()>
where
Ribosome: RibosomeT + 'static,
{
// get the dna
let dna_file = conductor_handle
.get_dna_file(id.dna_hash())
.ok_or_else(|| DnaError::DnaMissing(id.dna_hash().to_owned()))?;
let conductor_api = CellConductorApi::new(conductor_handle.clone(), id.clone());
// run genesis
let workspace = GenesisWorkspace::new(authored_db, dht_db)
.map_err(ConductorApiError::from)
.map_err(Box::new)?;
// exit early if genesis has already run
if workspace.has_genesis(id.agent_pubkey().clone()).await? {
return Ok(());
}
let args = GenesisWorkflowArgs::new(
dna_file,
id.agent_pubkey().clone(),
membrane_proof,
ribosome,
dht_db_cache,
chc,
);
genesis_workflow(workspace, conductor_api, args)
.await
.map_err(ConductorApiError::from)
.map_err(Box::new)?;
if let Some(trigger) = conductor_handle
.get_queue_consumer_workflows()
.integration_trigger(Arc::new(id.dna_hash().clone()))
{
trigger.trigger(&"genesis");
}
Ok(())
}
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
pub(crate) fn incoming_countersigning(
ops: Vec<(DhtOpHash, DhtOp)>,
workspace: &CountersigningWorkspace,
trigger: TriggerSender,
) -> WorkflowResult<()> {
let mut should_trigger = false;
// For each op check it's the right type and extract the
// entry hash, required actions and expires time.
for (hash, op) in ops {
// Must be a store entry op.
if let DhtOp::StoreEntry(_, _, entry) = &op {
// Must have a counter sign entry type.
if let Entry::CounterSign(session_data, _) = entry.as_ref() {
let entry_hash = EntryHash::with_data_sync(&**entry);
// Get the required actions for this session.
let weight = weigh_placeholder();
let action_set = session_data.build_action_set(entry_hash, weight)?;
// Get the expires time for this session.
let expires = *session_data.preflight_request().session_times.end();
// Get the entry hash from an action.
// If the actions have different entry hashes they will fail validation.
if let Some(entry_hash) = action_set.first().and_then(|h| h.entry_hash().cloned()) {
// Hash the required actions.
let required_actions: Vec<_> = action_set
.into_iter()
.map(|h| ActionHash::with_data_sync(&h))
.collect();
// Check if already timed out.
if holochain_zome_types::Timestamp::now() < expires {
// Put this op in the pending map.
workspace.put(entry_hash, hash, op, required_actions, expires);
// We have new ops so we should trigger the workflow.
should_trigger = true;
}
}
}
}
}
// Trigger the workflow if we have new ops.
if should_trigger {
trigger.trigger(&"incoming_countersigning");
}
Ok(())
}
/// Countersigning workflow that checks for complete sessions and
/// pushes the complete ops to validation then messages the signers.
pub(crate) async fn countersigning_workflow(
space: &Space,
network: &(dyn HolochainP2pDnaT + Send + Sync),
sys_validation_trigger: &TriggerSender,
) -> WorkflowResult<WorkComplete> {
// Get any complete sessions.
let complete_sessions = space.countersigning_workspace.get_complete_sessions();
let mut notify_agents = Vec::with_capacity(complete_sessions.len());
// For each complete session send the ops to validation.
for (agents, ops, actions) in complete_sessions {
let non_enzymatic_ops: Vec<_> = ops
.into_iter()
.filter(|(_hash, dht_op)| dht_op.enzymatic_countersigning_enzyme().is_none())
.collect();
if !non_enzymatic_ops.is_empty() {
incoming_dht_ops_workflow(
space,
sys_validation_trigger.clone(),
non_enzymatic_ops,
false,
)
.await?;
}
notify_agents.push((agents, actions));
}
// For each complete session notify the agents of success.
for (agents, actions) in notify_agents {
if let Err(e) = network
.countersigning_session_negotiation(
agents,
CountersigningSessionNegotiationMessage::AuthorityResponse(actions),
)
.await
{
// This could likely fail if a signer is offline so it's not really an error.
tracing::info!(
"Failed to notify agents: counter signed actions because of {:?}",
e
);
}
}
Ok(WorkComplete::Complete)
}
/// An incoming countersigning session success.
pub(crate) async fn countersigning_success(
space: Space,
network: &HolochainP2pDna,
author: AgentPubKey,
signed_actions: Vec<SignedAction>,
trigger: QueueTriggers,
mut signal: SignalBroadcaster,
) -> WorkflowResult<()> {
let authored_db = space.authored_db;
let dht_db = space.dht_db;
let dht_db_cache = space.dht_query_cache;
let QueueTriggers {
publish_dht_ops: publish_trigger,
integrate_dht_ops: integration_trigger,
..
} = trigger;
// Using iterators is fine in this function as there can only be a maximum of 8 actions.
let (this_cells_action_hash, entry_hash) = match signed_actions
.iter()
.find(|h| *h.0.author() == author)
.and_then(|sh| {
sh.0.entry_hash()
.cloned()
.map(|eh| (ActionHash::with_data_sync(&sh.0), eh))
}) {
Some(h) => h,
None => return Ok(()),
};
// Do a quick check to see if this entry hash matches
// the current locked session so we don't check signatures
// unless there is an active session.
let reader_closure = {
let entry_hash = entry_hash.clone();
let this_cells_action_hash = this_cells_action_hash.clone();
let author = author.clone();
move |txn: Transaction| {
if holochain_state::chain_lock::is_chain_locked(&txn, &[], &author)? {
let transaction: holochain_state::prelude::Txn = (&txn).into();
if transaction.contains_entry(&entry_hash)? {
// If this is a countersigning session we can grab all the ops
// for this cells action so we can check if we need to self publish them.
let r: Result<_, _> = txn
.prepare(
"SELECT basis_hash, hash FROM DhtOp WHERE action_hash = :action_hash",
)?
.query_map(
named_params! {
":action_hash": this_cells_action_hash
},
|row| {
let hash: DhtOpHash = row.get("hash")?;
let basis: OpBasis = row.get("basis_hash")?;
Ok((hash, basis))
},
)?
.collect();
return Ok(r?);
}
}
StateMutationResult::Ok(Vec::with_capacity(0))
}
};
let this_cell_actions_op_basis_hashes: Vec<(DhtOpHash, OpBasis)> =
authored_db.async_reader(reader_closure).await?;
// If there is no active session then we can short circuit.
if this_cell_actions_op_basis_hashes.is_empty() {
return Ok(());
}
// Verify signatures of actions.
let mut i_am_an_author = false;
for SignedAction(action, signature) in &signed_actions {
if !action.author().verify_signature(signature, action).await {
return Ok(());
}
if action.author() == &author {
i_am_an_author = true;
}
}
// Countersigning success is ultimately between authors to agree and publish.
if !i_am_an_author {
return Ok(());
}
// Hash actions.
let incoming_actions: Vec<_> = signed_actions
.iter()
.map(|SignedAction(h, _)| ActionHash::with_data_sync(h))
.collect();
let result = authored_db
.async_commit({
let author = author.clone();
let entry_hash = entry_hash.clone();
move |txn| {
if let Some((cs_entry_hash, cs)) = current_countersigning_session(txn, Arc::new(author.clone()))? {
// Check we have the right session.
if cs_entry_hash == entry_hash {
let weight = weigh_placeholder();
let stored_actions = cs.build_action_set(entry_hash, weight)?;
if stored_actions.len() == incoming_actions.len() {
// Check all stored action hashes match an incoming action hash.
if stored_actions.iter().all(|h| {
let h = ActionHash::with_data_sync(h);
incoming_actions.iter().any(|i| *i == h)
}) {
// All checks have passed so unlock the chain.
mutations::unlock_chain(txn, &author)?;
// Update ops to publish.
txn.execute("UPDATE DhtOp SET withhold_publish = NULL WHERE action_hash = :action_hash",
named_params! {
":action_hash": this_cells_action_hash,
}
).map_err(holochain_state::prelude::StateMutationError::from)?;
return Ok(true);
}
}
}
}
SourceChainResult::Ok(false)
}})
.await?;
if result {
// If all signatures are valid (above) and i signed then i must have
// validated it previously so i now agree that i authored it.
authored_ops_to_dht_db_without_check(
this_cell_actions_op_basis_hashes
.into_iter()
.map(|(op_hash, _)| op_hash)
.collect(),
&(authored_db.into()),
&dht_db,
&dht_db_cache,
)
.await?;
integration_trigger.trigger(&"countersigning_success");
// Publish other signers agent activity ops to their agent activity authorities.
for SignedAction(action, signature) in signed_actions {
if *action.author() == author {
continue;
}
let op = DhtOp::RegisterAgentActivity(signature, action);
let basis = op.dht_basis();
if let Err(e) = network.publish_countersign(false, basis, op).await {
tracing::error!(
"Failed to publish to other countersigners agent authorities because of: {:?}",
e
);
}
}
// Signal to the UI.
signal.send(Signal::System(SystemSignal::SuccessfulCountersigning(
entry_hash,
)))?;
publish_trigger.trigger(&"publish countersigning_success");
}
Ok(())
}
sourcepub fn reset_back_off(&self)
pub fn reset_back_off(&self)
Reset the back off to the lowest duration. If no back off is set this is a no-op.
sourcepub fn pause_loop(&self)
pub fn pause_loop(&self)
Pause the trigger loop if there is one.
sourcepub fn resume_loop_now(&self)
pub fn resume_loop_now(&self)
Resume the trigger loop now if there is one.
This will resume the loop even if it is currently listening (the workflow is not running). The downside to this call is that if the workflow is running it will immediately run a second time.
This call is a no-op if the loop is not paused.
sourcepub fn resume_loop(&self)
pub fn resume_loop(&self)
Resume the trigger loop if there is one.
This will cause the loop to to resume after the
next trigger (or if the workflow is currently in progress).
It will not cause the loop to resume immediately.
If the loop is currently listening (the workflow is not running)
then nothing will happen until the next trigger.
See resume_loop_now
for a version that will resume immediately.
This call is a no-op if the loop is not paused.
Trait Implementations§
source§impl Clone for TriggerSender
impl Clone for TriggerSender
source§fn clone(&self) -> TriggerSender
fn clone(&self) -> TriggerSender
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreAuto Trait Implementations§
impl !RefUnwindSafe for TriggerSender
impl Send for TriggerSender
impl Sync for TriggerSender
impl Unpin for TriggerSender
impl !UnwindSafe for TriggerSender
Blanket Implementations§
§impl<T> Any for Twhere
T: Any + ?Sized,
impl<T> Any for Twhere
T: Any + ?Sized,
§fn type_id_compat(&self) -> TypeId
fn type_id_compat(&self) -> TypeId
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata
) -> <T as Pointee>::Metadata
fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata
) -> <T as Pointee>::Metadata
§impl<F, W, T, D> Deserialize<With<T, W>, D> for Fwhere
W: DeserializeWith<F, T, D>,
D: Fallible + ?Sized,
F: ?Sized,
impl<F, W, T, D> Deserialize<With<T, W>, D> for Fwhere
W: DeserializeWith<F, T, D>,
D: Fallible + ?Sized,
F: ?Sized,
§fn deserialize(
&self,
deserializer: &mut D
) -> Result<With<T, W>, <D as Fallible>::Error>
fn deserialize(
&self,
deserializer: &mut D
) -> Result<With<T, W>, <D as Fallible>::Error>
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self> ⓘ
fn with_context(self, otel_cx: Context) -> WithContext<Self> ⓘ
§fn with_current_context(self) -> WithContext<Self> ⓘ
fn with_current_context(self) -> WithContext<Self> ⓘ
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
source§fn in_current_span(self) -> Instrumented<Self> ⓘ
fn in_current_span(self) -> Instrumented<Self> ⓘ
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
source§fn in_current_span(self) -> Instrumented<Self> ⓘ
fn in_current_span(self) -> Instrumented<Self> ⓘ
§impl<T> Pointable for T
impl<T> Pointable for T
§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
self
from the equivalent element of its
superset. Read more§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
self
is actually part of its subset T
(and can be converted to it).§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
self.to_subset
but without any property checks. Always succeeds.§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
self
to the equivalent element of its superset.