use crate::{
machines::WFCommand,
protos::{
coresdk::workflow_activation::{wf_activation_job, SignalWorkflow, WfActivationJob},
temporal::api::history::v1::{
WorkflowExecutionCanceledEventAttributes, WorkflowExecutionStartedEventAttributes,
},
},
};
use std::collections::VecDeque;
pub struct DrivenWorkflow {
started_attrs: Option<WorkflowExecutionStartedEventAttributes>,
fetcher: Box<dyn ExternalWorkflow>,
outgoing_wf_activation_jobs: VecDeque<wf_activation_job::Variant>,
}
impl<WF> From<Box<WF>> for DrivenWorkflow
where
WF: ExternalWorkflow + 'static,
{
fn from(wf: Box<WF>) -> Self {
Self {
started_attrs: None,
fetcher: wf,
outgoing_wf_activation_jobs: Default::default(),
}
}
}
impl DrivenWorkflow {
pub fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) {
debug!(run_id = %attribs.original_execution_run_id, "Driven WF start");
self.started_attrs = Some(attribs)
}
pub fn send_job(&mut self, job: wf_activation_job::Variant) {
self.fetcher.on_activation_job(&job);
self.outgoing_wf_activation_jobs.push_back(job);
}
pub fn drain_jobs(&mut self) -> Vec<WfActivationJob> {
self.outgoing_wf_activation_jobs
.drain(..)
.map(Into::into)
.collect()
}
pub fn signal(&mut self, signal: SignalWorkflow) {
self.send_job(wf_activation_job::Variant::SignalWorkflow(signal))
}
pub fn _cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {}
}
impl WorkflowFetcher for DrivenWorkflow {
fn fetch_workflow_iteration_output(&mut self) -> Vec<WFCommand> {
self.fetcher.fetch_workflow_iteration_output()
}
}
impl ActivationListener for DrivenWorkflow {
fn on_activation_job(&mut self, a: &wf_activation_job::Variant) {
self.fetcher.on_activation_job(a)
}
}
pub trait ExternalWorkflow: WorkflowFetcher + ActivationListener {}
impl<T> ExternalWorkflow for T where T: WorkflowFetcher + ActivationListener {}
pub trait WorkflowFetcher: Send {
fn fetch_workflow_iteration_output(&mut self) -> Vec<WFCommand>;
}
pub trait ActivationListener {
fn on_activation_job(&mut self, _activation: &wf_activation_job::Variant) {}
}