use super::metrics::create_workflow_duration_metric;
use super::workflow::app_validation_workflow::AppValidationWorkspace;
use super::workflow::sys_validation_workflow::SysValidationWorkspace;
use super::workflow::{WorkflowError, WorkflowResult};
use crate::conductor::conductor::{RwShare, StopReceiver};
use crate::conductor::manager::TaskManagerClient;
use crate::conductor::space::Space;
use crate::conductor::ConductorHandle;
use crate::conductor::{error::ConductorError, manager::ManagedTaskResult};
use derive_more::Display;
use futures::future::Either;
use futures::{Future, Stream, StreamExt};
use holochain_p2p::*;
use holochain_types::prelude::*;
use publish_dht_ops_consumer::*;
use std::collections::HashMap;
use std::ops::Range;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use witnessing_consumer::*;
mod integrate_dht_ops_consumer;
use integrate_dht_ops_consumer::*;
mod sys_validation_consumer;
use sys_validation_consumer::*;
mod app_validation_consumer;
use app_validation_consumer::*;
mod publish_dht_ops_consumer;
use crate::conductor::error::ConductorResult;
use crate::core::queue_consumer::countersigning_consumer::spawn_countersigning_consumer;
use crate::core::workflow::countersigning_workflow::CountersigningWorkspace;
use validation_receipt_consumer::*;
mod countersigning_consumer;
mod validation_receipt_consumer;
mod witnessing_consumer;
#[cfg(test)]
mod tests;
#[allow(clippy::too_many_arguments)]
pub async fn spawn_queue_consumer_tasks(
cell_id: CellId,
network: DynHolochainP2pDna,
space: &Space,
conductor: ConductorHandle,
) -> ConductorResult<(QueueTriggers, InitialQueueTriggers)> {
let Space {
dht_db,
cache_db: cache,
..
} = space;
let keystore = conductor.keystore().clone();
let dna_hash = Arc::new(cell_id.dna_hash().clone());
let queue_consumer_map = conductor.get_queue_consumer_workflows();
let authored_db = space.get_or_create_authored_db(cell_id.agent_pubkey().clone())?;
let tx_publish = spawn_publish_dht_ops_consumer(
cell_id.clone(),
authored_db.clone(),
conductor.clone(),
network.clone(),
);
let tx_receipt = queue_consumer_map.spawn_once_validation_receipt(dna_hash.clone(), || {
spawn_validation_receipt_consumer(
dna_hash.clone(),
dht_db.clone(),
conductor.clone(),
network.clone(),
)
});
let tx_integration = queue_consumer_map.spawn_once_integration(dna_hash.clone(), || {
spawn_integrate_dht_ops_consumer(
dna_hash.clone(),
dht_db.clone(),
conductor.task_manager(),
tx_receipt.clone(),
network.clone(),
conductor.clone(),
)
});
let tx_app = queue_consumer_map.spawn_once_app_validation(dna_hash.clone(), || {
spawn_app_validation_consumer(
dna_hash.clone(),
AppValidationWorkspace::new(
authored_db.clone(),
dht_db.clone(),
cache.clone(),
keystore.clone(),
),
conductor.clone(),
tx_integration.clone(),
tx_publish.clone(),
network.clone(),
)
});
let tx_sys = queue_consumer_map.spawn_once_sys_validation(dna_hash.clone(), || {
spawn_sys_validation_consumer(
SysValidationWorkspace::new(
authored_db.clone(),
dht_db.clone(),
cache.clone(),
cell_id.dna_hash().clone(),
conductor
.get_config()
.conductor_tuning_params()
.sys_validation_retry_delay(),
),
space.clone(),
conductor.clone(),
tx_app.clone(),
tx_integration.clone(),
tx_publish.clone(),
network.clone(),
conductor.keystore().clone(),
)
});
let workspace = {
let mut guard = space.countersigning_workspaces.lock();
guard
.entry(cell_id.clone())
.or_insert_with(|| {
Arc::new(CountersigningWorkspace::new(
conductor
.config
.conductor_tuning_params()
.countersigning_resolution_retry_delay(),
conductor
.config
.conductor_tuning_params()
.countersigning_resolution_retry_limit,
))
})
.clone()
};
let tx_countersigning = spawn_countersigning_consumer(
space.clone(),
workspace,
cell_id,
conductor.clone(),
tx_integration.clone(),
tx_publish.clone(),
);
let tx_witnessing = queue_consumer_map.spawn_once_witnessing(dna_hash, || {
spawn_witnessing_consumer(
space.clone(),
conductor.task_manager(),
network.clone(),
tx_sys.clone(),
)
});
Ok((
QueueTriggers {
sys_validation: tx_sys.clone(),
publish_dht_ops: tx_publish.clone(),
countersigning: tx_countersigning.clone(),
witnessing: tx_witnessing,
integrate_dht_ops: tx_integration.clone(),
},
InitialQueueTriggers::new(
tx_sys,
tx_publish,
tx_app,
tx_integration,
tx_receipt,
tx_countersigning,
),
))
}
#[derive(Clone)]
pub struct QueueConsumerMap {
map: RwShare<HashMap<QueueEntry, TriggerSender>>,
}
impl Default for QueueConsumerMap {
fn default() -> Self {
Self::new()
}
}
impl QueueConsumerMap {
pub fn new() -> Self {
Self {
map: RwShare::new(HashMap::new()),
}
}
fn spawn_once_validation_receipt<S>(&self, dna_hash: Arc<DnaHash>, spawn: S) -> TriggerSender
where
S: FnOnce() -> TriggerSender,
{
self.spawn_once(QueueEntry(dna_hash, QueueType::Receipt), spawn)
}
fn spawn_once_integration<S>(&self, dna_hash: Arc<DnaHash>, spawn: S) -> TriggerSender
where
S: FnOnce() -> TriggerSender,
{
self.spawn_once(QueueEntry(dna_hash, QueueType::Integration), spawn)
}
fn spawn_once_sys_validation<S>(&self, dna_hash: Arc<DnaHash>, spawn: S) -> TriggerSender
where
S: FnOnce() -> TriggerSender,
{
self.spawn_once(QueueEntry(dna_hash, QueueType::SysValidation), spawn)
}
fn spawn_once_app_validation<S>(&self, dna_hash: Arc<DnaHash>, spawn: S) -> TriggerSender
where
S: FnOnce() -> TriggerSender,
{
self.spawn_once(QueueEntry(dna_hash, QueueType::AppValidation), spawn)
}
fn spawn_once_witnessing<S>(&self, dna_hash: Arc<DnaHash>, spawn: S) -> TriggerSender
where
S: FnOnce() -> TriggerSender,
{
self.spawn_once(QueueEntry(dna_hash, QueueType::Witnessing), spawn)
}
pub fn validation_receipt_trigger(&self, dna_hash: Arc<DnaHash>) -> Option<TriggerSender> {
self.get_trigger(&QueueEntry(dna_hash, QueueType::Receipt))
}
pub fn integration_trigger(&self, dna_hash: Arc<DnaHash>) -> Option<TriggerSender> {
self.get_trigger(&QueueEntry(dna_hash, QueueType::Integration))
}
pub fn sys_validation_trigger(&self, dna_hash: Arc<DnaHash>) -> Option<TriggerSender> {
self.get_trigger(&QueueEntry(dna_hash, QueueType::SysValidation))
}
pub fn app_validation_trigger(&self, dna_hash: Arc<DnaHash>) -> Option<TriggerSender> {
self.get_trigger(&QueueEntry(dna_hash, QueueType::AppValidation))
}
pub fn countersigning_trigger(&self, dna_hash: Arc<DnaHash>) -> Option<TriggerSender> {
self.get_trigger(&QueueEntry(dna_hash, QueueType::Countersigning))
}
pub fn witnessing_trigger(&self, dna_hash: Arc<DnaHash>) -> Option<TriggerSender> {
self.get_trigger(&QueueEntry(dna_hash, QueueType::Witnessing))
}
fn get_trigger(&self, key: &QueueEntry) -> Option<TriggerSender> {
self.map.share_ref(|map| map.get(key).cloned())
}
fn spawn_once<S>(&self, key: QueueEntry, spawn: S) -> TriggerSender
where
S: FnOnce() -> TriggerSender,
{
self.map.share_mut(|map| match map.entry(key) {
std::collections::hash_map::Entry::Occupied(o) => o.get().clone(),
std::collections::hash_map::Entry::Vacant(v) => {
let ts = spawn();
v.insert(ts).clone()
}
})
}
}
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
struct QueueEntry(Arc<DnaHash>, QueueType);
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
enum QueueType {
Receipt,
Integration,
AppValidation,
SysValidation,
Countersigning,
Witnessing,
}
#[derive(Clone)]
pub struct QueueTriggers {
pub sys_validation: TriggerSender,
pub publish_dht_ops: TriggerSender,
pub countersigning: TriggerSender,
pub witnessing: TriggerSender,
pub integrate_dht_ops: TriggerSender,
}
#[derive(Clone)]
pub struct InitialQueueTriggers {
sys_validation: TriggerSender,
publish_dht_ops: TriggerSender,
app_validation: TriggerSender,
integrate_dht_ops: TriggerSender,
validation_receipt: TriggerSender,
countersigning: TriggerSender,
}
impl InitialQueueTriggers {
fn new(
sys_validation: TriggerSender,
publish_dht_ops: TriggerSender,
app_validation: TriggerSender,
integrate_dht_ops: TriggerSender,
validation_receipt: TriggerSender,
countersigning: TriggerSender,
) -> Self {
Self {
sys_validation,
publish_dht_ops,
app_validation,
integrate_dht_ops,
validation_receipt,
countersigning,
}
}
pub fn initialize_workflows(self) {
self.sys_validation.trigger(&"init");
self.app_validation.trigger(&"init");
self.integrate_dht_ops.trigger(&"init");
self.publish_dht_ops.trigger(&"init");
self.validation_receipt.trigger(&"init");
self.countersigning.trigger(&"init");
}
}
#[derive(Clone)]
pub struct TriggerSender {
trigger: broadcast::Sender<&'static &'static str>,
reset_back_off: Option<Arc<AtomicBool>>,
pause_back_off: Option<Arc<AtomicBool>>,
}
pub struct TriggerReceiver {
rx: broadcast::Receiver<&'static &'static str>,
reset_on_trigger: bool,
back_off: Option<BackOff>,
}
struct BackOff {
start: Duration,
range: Range<Duration>,
reset_back_off: Arc<AtomicBool>,
paused: Arc<AtomicBool>,
}
impl TriggerSender {
pub fn new() -> (TriggerSender, TriggerReceiver) {
let (tx, rx) = broadcast::channel(1);
(
TriggerSender {
trigger: tx,
reset_back_off: None,
pause_back_off: None,
},
TriggerReceiver {
rx,
back_off: None,
reset_on_trigger: false,
},
)
}
pub fn new_with_loop(
range: Range<Duration>,
reset_on_trigger: bool,
) -> (TriggerSender, TriggerReceiver) {
let (tx, rx) = broadcast::channel(1);
let reset_back_off = Arc::new(AtomicBool::new(false));
let pause_back_off = Arc::new(AtomicBool::new(false));
(
TriggerSender {
trigger: tx,
reset_back_off: Some(reset_back_off.clone()),
pause_back_off: Some(pause_back_off.clone()),
},
TriggerReceiver {
rx,
reset_on_trigger,
back_off: Some(BackOff::new(range, reset_back_off, pause_back_off)),
},
)
}
pub fn trigger(&self, context: &'static &'static str) {
if self.trigger.send(context).is_err() {
tracing::warn!(
"Queue consumer trigger was sent while Cell is shutting down: ignoring."
);
};
}
pub fn reset_back_off(&self) {
if let Some(tx) = &self.reset_back_off {
tx.store(true, Ordering::Relaxed);
}
}
pub fn pause_loop(&self) {
if let Some(pause) = &self.pause_back_off {
pause.store(true, Ordering::Relaxed);
}
}
pub fn resume_loop_now(&self) {
if let Some(pause) = &self.pause_back_off {
if pause.fetch_and(false, Ordering::AcqRel) {
self.trigger(&"resume_loop_now");
}
}
}
pub fn resume_loop(&self) {
if let Some(pause) = &self.pause_back_off {
pause.store(false, Ordering::Release);
}
}
}
impl TriggerReceiver {
pub async fn listen(&mut self) -> Result<(), QueueTriggerClosedError> {
let Self {
back_off,
rx,
reset_on_trigger,
} = self;
let mut was_trigger = true;
{
let trigger_fut = rx_fut(rx);
match back_off {
Some(back_off) if !back_off.is_paused() => {
let paused = back_off.paused.clone();
{
let back_off_fut = back_off.wait();
futures::pin_mut!(back_off_fut, trigger_fut);
match futures::future::select(trigger_fut, back_off_fut).await {
Either::Left((result, _)) => {
result?;
}
Either::Right((_, trigger_fut)) => {
if paused.load(Ordering::Acquire) {
trigger_fut.await?;
} else {
was_trigger = false;
}
}
}
}
}
_ => {
trigger_fut.await?;
}
}
}
if was_trigger {
let _ = self.rx.try_recv();
if *reset_on_trigger {
if let Some(back_off) = back_off {
back_off.reset();
}
}
}
Ok(())
}
pub fn is_paused(&self) -> bool {
self.back_off.as_ref().is_some_and(|b| b.is_paused())
}
#[cfg(test)]
pub fn try_recv(&mut self) -> Option<&'static &'static str> {
self.rx.try_recv().ok()
}
}
async fn rx_fut(
rx: &mut broadcast::Receiver<&'static &'static str>,
) -> Result<(), QueueTriggerClosedError> {
match rx.recv().await {
Ok(context) => {
tracing::trace!(msg = "trigger received", ?context);
Ok(())
}
Err(broadcast::error::RecvError::Closed) => Err(QueueTriggerClosedError),
Err(broadcast::error::RecvError::Lagged(_)) => Ok(()),
}
}
impl BackOff {
fn new(
range: Range<Duration>,
reset_back_off: Arc<AtomicBool>,
pause_back_off: Arc<AtomicBool>,
) -> Self {
Self {
start: range.start,
range,
reset_back_off,
paused: pause_back_off,
}
}
async fn wait(&mut self) {
if self.reset_back_off.fetch_and(false, Ordering::Relaxed) {
self.reset();
}
let dur = if self.range.is_empty() {
self.range.end
} else {
self.range.start
};
tokio::time::sleep(dur).await;
self.range.start = std::cmp::min(self.range.start * 2, self.range.end);
}
fn reset(&mut self) {
self.range.start = self.start;
}
fn is_paused(&self) -> bool {
self.paused.load(Ordering::Acquire)
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum WorkComplete {
Complete,
Incomplete(Option<Duration>),
}
#[derive(Debug, Display, thiserror::Error)]
pub struct QueueTriggerClosedError;
pub(super) fn trigger_stream(rx: TriggerReceiver, stop: StopReceiver) -> impl Stream<Item = ()> {
stop.fuse_with(Box::pin(futures::stream::unfold(rx, |mut rx| async move {
match rx.listen().await {
Ok(()) => Some(((), rx)),
Err(_) => None,
}
})))
}
async fn queue_consumer_main_task_impl<
Fut: 'static + Send + Future<Output = WorkflowResult<WorkComplete>>,
>(
name: String,
dna_hash: Arc<DnaHash>,
_agent: Option<AgentPubKey>,
(tx, rx): (TriggerSender, TriggerReceiver),
stop: StopReceiver,
mut fut: impl 'static + Send + FnMut() -> Fut,
) -> ManagedTaskResult {
let mut triggers = trigger_stream(rx, stop);
let duration_metric = create_workflow_duration_metric(name.clone(), dna_hash);
loop {
if let Some(()) = triggers.next().await {
let start = Instant::now();
match fut().await {
Ok(WorkComplete::Incomplete(delay)) => {
tracing::debug!("Work incomplete, re-triggering workflow - {}.", name);
if let Some(dly) = delay {
tracing::debug!(
"Sleeping for {} ms before re-triggering - {}.",
dly.as_millis(),
name
);
tokio::time::sleep(dly).await;
}
tx.trigger(&"retrigger")
}
Err(err) => handle_workflow_error(&name, err)?,
_ => (),
}
duration_metric.record(start.elapsed().as_secs_f64());
} else {
tracing::info!("Cell is shutting down: stopping queue consumer '{}'", name);
break;
}
}
ManagedTaskResult::Ok(())
}
fn queue_consumer_dna_bound<Fut: 'static + Send + Future<Output = WorkflowResult<WorkComplete>>>(
name: &str,
dna_hash: Arc<DnaHash>,
tm: TaskManagerClient,
(tx, rx): (TriggerSender, TriggerReceiver),
fut: impl 'static + Send + FnMut() -> Fut,
) {
let workflow_name = name.to_string();
let task_dna_hash = dna_hash.clone();
tm.add_dna_task_critical(name, dna_hash, {
move |stop| {
queue_consumer_main_task_impl(workflow_name, task_dna_hash, None, (tx, rx), stop, fut)
}
});
}
fn queue_consumer_cell_bound<
Fut: 'static + Send + Future<Output = WorkflowResult<WorkComplete>>,
>(
name: &str,
cell_id: CellId,
tm: TaskManagerClient,
(tx, rx): (TriggerSender, TriggerReceiver),
fut: impl 'static + Send + FnMut() -> Fut,
) {
let workflow_name = name.to_string();
let dna_hash = cell_id.dna_hash().clone();
let agent = cell_id.agent_pubkey().clone();
tm.add_cell_task_critical(name, cell_id, {
move |stop| {
queue_consumer_main_task_impl(
workflow_name,
Arc::new(dna_hash),
Some(agent),
(tx, rx),
stop,
fut,
)
}
});
}
fn handle_workflow_error(workflow_name: &String, err: WorkflowError) -> ManagedTaskResult {
if err.workflow_should_bail() {
Err(Box::new(ConductorError::from(err)).into())
} else {
tracing::error!(?workflow_name, ?err);
Ok(())
}
}