mod bridge;
mod concurrency_manager;
mod driven_workflow;
pub(crate) use bridge::WorkflowBridge;
pub(crate) use concurrency_manager::WorkflowConcurrencyManager;
pub(crate) use driven_workflow::{ActivationListener, DrivenWorkflow, WorkflowFetcher};
use crate::{
machines::{ProtoCommand, WFCommand, WFMachinesError, WorkflowMachines},
protos::{
coresdk::workflow_activation::WfActivation,
temporal::api::{common::v1::WorkflowExecution, history::v1::History},
},
protosext::{HistoryInfo, HistoryInfoError},
};
use std::sync::mpsc::{SendError, Sender};
type Result<T, E = WorkflowError> = std::result::Result<T, E>;
#[derive(thiserror::Error, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum WorkflowError {
#[error("Workflow machines associated with `{run_id}` not found")]
MissingMachine { run_id: String },
#[error("Underlying error in state machines: {0:?}")]
UnderlyingMachinesError(#[from] WFMachinesError),
#[error("There was an error in the history associated with the workflow: {0:?}")]
HistoryError(#[from] HistoryInfoError),
#[error("Internal error buffering workflow commands")]
CommandBufferingError(#[from] SendError<Vec<WFCommand>>),
#[error("Machine created with no activations for run_id {run_id}")]
MachineWasCreatedWithNoActivations { run_id: String },
}
pub(crate) struct WorkflowManager {
pub machines: WorkflowMachines,
command_sink: Sender<Vec<WFCommand>>,
last_history_from_server: History,
last_history_task_count: usize,
current_wf_task_num: usize,
}
impl WorkflowManager {
pub fn new(
history: History,
workflow_execution: WorkflowExecution,
) -> Result<Self, HistoryInfoError> {
let (wfb, cmd_sink) = WorkflowBridge::new();
let state_machines = WorkflowMachines::new(
workflow_execution.workflow_id,
workflow_execution.run_id,
Box::new(wfb).into(),
);
Ok(Self {
machines: state_machines,
command_sink: cmd_sink,
last_history_task_count: history.get_workflow_task_count(None)?,
last_history_from_server: history,
current_wf_task_num: 1,
})
}
}
#[derive(Debug)]
pub(crate) struct NextWfActivation {
activation: WfActivation,
pub more_activations_needed: bool,
}
impl NextWfActivation {
pub(crate) fn finalize(self, task_token: Vec<u8>) -> WfActivation {
let mut a = self.activation;
a.task_token = task_token;
a
}
}
#[derive(Debug)]
pub(crate) struct PushCommandsResult {
pub server_commands: Vec<ProtoCommand>,
pub has_new_lang_jobs: bool,
}
impl WorkflowManager {
pub fn feed_history_from_server(&mut self, hist: History) -> Result<Option<NextWfActivation>> {
let task_hist = HistoryInfo::new_from_history(&hist, Some(self.current_wf_task_num))?;
let task_ct = hist.get_workflow_task_count(None)?;
self.last_history_task_count = task_ct;
self.last_history_from_server = hist;
self.machines.apply_history_events(&task_hist)?;
let activation = self.machines.get_wf_activation();
let more_activations_needed = task_ct > self.current_wf_task_num;
if more_activations_needed {
debug!("More activations needed");
}
self.current_wf_task_num += 1;
Ok(activation.map(|activation| NextWfActivation {
activation,
more_activations_needed,
}))
}
pub fn get_next_activation(&mut self) -> Result<Option<NextWfActivation>> {
let hist = &self.last_history_from_server;
let task_hist = HistoryInfo::new_from_history(hist, Some(self.current_wf_task_num))?;
self.machines.apply_history_events(&task_hist)?;
let activation = self.machines.get_wf_activation();
self.current_wf_task_num += 1;
let more_activations_needed = self.current_wf_task_num <= self.last_history_task_count;
if more_activations_needed {
debug!("More activations needed");
}
Ok(activation.map(|activation| NextWfActivation {
activation,
more_activations_needed,
}))
}
pub fn push_commands(&mut self, cmds: Vec<WFCommand>) -> Result<PushCommandsResult> {
self.command_sink.send(cmds)?;
let has_new_lang_jobs = self.machines.iterate_machines()?;
Ok(PushCommandsResult {
server_commands: self.machines.get_commands(),
has_new_lang_jobs,
})
}
}