use crate::protos::coresdk::workflow_activation::wf_activation_job::Variant;
use crate::protos::coresdk::PayloadsToPayloadError;
use crate::{
core_tracing::VecDisplayer,
machines::{
activity_state_machine::new_activity, complete_workflow_state_machine::complete_workflow,
fail_workflow_state_machine::fail_workflow, timer_state_machine::new_timer,
workflow_task_state_machine::WorkflowTaskMachine, NewMachineWithCommand, ProtoCommand,
TemporalStateMachine, WFCommand,
},
protos::{
coresdk::{
workflow_activation::{
wf_activation_job, StartWorkflow, UpdateRandomSeed, WfActivation,
},
PayloadsExt,
},
temporal::api::{
enums::v1::{CommandType, EventType},
history::v1::{history_event, HistoryEvent},
},
},
protosext::HistoryInfo,
workflow::{DrivenWorkflow, WorkflowFetcher},
};
use slotmap::SlotMap;
use std::{
borrow::{Borrow, BorrowMut},
collections::{hash_map::DefaultHasher, HashMap, VecDeque},
hash::{Hash, Hasher},
time::SystemTime,
};
use tracing::Level;
type Result<T, E = WFMachinesError> = std::result::Result<T, E>;
pub(crate) struct WorkflowMachines {
workflow_task_started_event_id: i64,
current_started_event_id: i64,
previous_started_event_id: i64,
replaying: bool,
pub workflow_id: String,
pub run_id: String,
current_wf_time: Option<SystemTime>,
all_machines: SlotMap<MachineKey, Box<dyn TemporalStateMachine + 'static>>,
machines_by_event_id: HashMap<i64, MachineKey>,
id_to_machine: HashMap<CommandID, MachineKey>,
commands: VecDeque<CommandAndMachine>,
current_wf_task_commands: VecDeque<CommandAndMachine>,
drive_me: DrivenWorkflow,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum CommandID {
Timer(String),
Activity(String),
}
slotmap::new_key_type! { struct MachineKey; }
#[derive(Debug, derive_more::Display)]
#[display(fmt = "Cmd&Machine({})", "command")]
struct CommandAndMachine {
command: ProtoCommand,
machine: MachineKey,
}
#[derive(Debug, derive_more::From, derive_more::Display)]
#[must_use]
#[allow(clippy::large_enum_variant)]
pub enum MachineResponse {
#[display(fmt = "PushWFJob")]
PushWFJob(#[from(forward)] wf_activation_job::Variant),
IssueNewCommand(ProtoCommand),
#[display(fmt = "TriggerWFTaskStarted")]
TriggerWFTaskStarted {
task_started_event_id: i64,
time: SystemTime,
},
#[display(fmt = "UpdateRunIdOnWorkflowReset({})", run_id)]
UpdateRunIdOnWorkflowReset {
run_id: String,
},
}
#[derive(thiserror::Error, Debug)]
pub enum WFMachinesError {
#[error("Event {0:?} was not expected: {1}")]
UnexpectedEvent(HistoryEvent, &'static str),
#[error("Event {0:?} was not expected: {1}")]
InvalidTransitionDuringEvent(HistoryEvent, String),
#[error("Event {0:?} was malformed: {1}")]
MalformedEvent(HistoryEvent, String),
#[error("{0}")]
MalformedEventDetail(String),
#[error("Command type {0:?} was not expected")]
UnexpectedCommand(CommandType),
#[error("Command type {0} is not known")]
UnknownCommandType(i32),
#[error("No command was scheduled for event {0:?}")]
NoCommandScheduledForEvent(HistoryEvent),
#[error("Machine response {0:?} was not expected: {1}")]
UnexpectedMachineResponse(MachineResponse, String),
#[error("Command was missing its associated machine: {0}")]
MissingAssociatedMachine(String),
#[error("There was {0} when we expected exactly one payload while applying event: {1:?}")]
NotExactlyOnePayload(PayloadsToPayloadError, HistoryEvent),
#[error("Machine encountered an invalid transition: {0}")]
InvalidTransition(&'static str),
#[error("Invalid cancelation type: {0}")]
InvalidCancelationType(i32),
}
impl WorkflowMachines {
pub(crate) fn new(workflow_id: String, run_id: String, driven_wf: DrivenWorkflow) -> Self {
Self {
workflow_id,
run_id,
drive_me: driven_wf,
workflow_task_started_event_id: 0,
current_started_event_id: 0,
previous_started_event_id: 0,
replaying: false,
current_wf_time: None,
all_machines: Default::default(),
machines_by_event_id: Default::default(),
id_to_machine: Default::default(),
commands: Default::default(),
current_wf_task_commands: Default::default(),
}
}
pub(crate) fn get_last_started_event_id(&self) -> i64 {
self.current_started_event_id
}
#[instrument(level = "debug", skip(self), fields(run_id = % self.run_id))]
pub(crate) fn handle_event(
&mut self,
event: &HistoryEvent,
has_next_event: bool,
) -> Result<()> {
if event.is_command_event() {
self.handle_command_event(event)?;
return Ok(());
}
let event_type = EventType::from_i32(event.event_type).ok_or_else(|| {
WFMachinesError::UnexpectedEvent(event.clone(), "The event type is unknown")
})?;
if self.replaying
&& self.current_started_event_id >= self.previous_started_event_id
&& event_type != EventType::WorkflowTaskCompleted
{
self.replaying = false;
}
match event.get_initial_command_event_id() {
Some(initial_cmd_id) => {
let maybe_machine = self.machines_by_event_id.remove(&initial_cmd_id);
if let Some(sm) = maybe_machine {
self.submachine_handle_event(sm, event, has_next_event)?;
} else {
error!(
event=?event,
"During event handling, this event had an initial command ID but we could \
not find a matching state machine!"
);
}
if let Some(sm) = maybe_machine {
if !self.machine(sm).is_final_state() {
self.machines_by_event_id.insert(initial_cmd_id, sm);
}
}
}
None => self.handle_non_stateful_event(event, has_next_event)?,
}
Ok(())
}
pub(super) fn task_started(
&mut self,
task_started_event_id: i64,
time: SystemTime,
) -> Result<()> {
let s = span!(Level::DEBUG, "Task started trigger");
let _enter = s.enter();
self.current_started_event_id = task_started_event_id;
self.set_current_time(time);
self.iterate_machines()?;
Ok(())
}
fn handle_command_event(&mut self, event: &HistoryEvent) -> Result<()> {
debug!(current_commands = ?self.commands, "handling command event");
let consumed_cmd = loop {
let maybe_command = self.commands.pop_front();
let command = if let Some(c) = maybe_command {
c
} else {
return Err(WFMachinesError::NoCommandScheduledForEvent(event.clone()));
};
let mut break_later = false;
let canceled_before_sent = self
.machine(command.machine)
.was_cancelled_before_sent_to_server();
if !canceled_before_sent {
self.submachine_handle_event(command.machine, event, true)?;
}
if !canceled_before_sent {
break_later = true;
}
if break_later {
break command;
}
};
if !self.machine(consumed_cmd.machine).is_final_state() {
self.machines_by_event_id
.insert(event.event_id, consumed_cmd.machine);
}
Ok(())
}
fn handle_non_stateful_event(
&mut self,
event: &HistoryEvent,
has_next_event: bool,
) -> Result<()> {
match EventType::from_i32(event.event_type) {
Some(EventType::WorkflowExecutionStarted) => {
if let Some(history_event::Attributes::WorkflowExecutionStartedEventAttributes(
attrs,
)) = &event.attributes
{
self.run_id = attrs.original_execution_run_id.clone();
self.drive_me.send_job(
StartWorkflow {
workflow_type: attrs
.workflow_type
.as_ref()
.map(|wt| wt.name.clone())
.unwrap_or_default(),
workflow_id: self.workflow_id.clone(),
arguments: Vec::from_payloads(attrs.input.clone()),
randomness_seed: str_to_randomness_seed(
&attrs.original_execution_run_id,
),
}
.into(),
);
self.drive_me.start(attrs.clone());
} else {
return Err(WFMachinesError::MalformedEvent(
event.clone(),
"WorkflowExecutionStarted event did not have appropriate attributes"
.to_string(),
));
}
}
Some(EventType::WorkflowTaskScheduled) => {
let wf_task_sm = WorkflowTaskMachine::new(self.workflow_task_started_event_id);
let key = self.all_machines.insert(Box::new(wf_task_sm));
self.submachine_handle_event(key, event, has_next_event)?;
self.machines_by_event_id.insert(event.event_id, key);
}
Some(EventType::WorkflowExecutionSignaled) => {
if let Some(history_event::Attributes::WorkflowExecutionSignaledEventAttributes(
attrs,
)) = &event.attributes
{
self.drive_me.signal(attrs.clone().into());
} else {
}
}
Some(EventType::WorkflowExecutionCancelRequested) => {
}
_ => {
return Err(WFMachinesError::UnexpectedEvent(
event.clone(),
"The event is non a non-stateful event, but we tried to handle it as one",
));
}
}
Ok(())
}
pub(crate) fn get_commands(&mut self) -> Vec<ProtoCommand> {
self.commands
.iter()
.filter_map(|c| {
if !self.machine(c.machine).is_final_state() {
Some(c.command.clone())
} else {
None
}
})
.collect()
}
pub(crate) fn get_wf_activation(&mut self) -> Option<WfActivation> {
let jobs = self.drive_me.drain_jobs();
if jobs.is_empty() {
None
} else {
Some(WfActivation {
timestamp: self.current_wf_time.map(Into::into),
run_id: self.run_id.clone(),
jobs,
task_token: vec![],
})
}
}
pub(crate) fn set_started_ids(
&mut self,
previous_started_event_id: i64,
workflow_task_started_event_id: i64,
) {
self.previous_started_event_id = previous_started_event_id;
self.workflow_task_started_event_id = workflow_task_started_event_id;
self.replaying = previous_started_event_id > 0;
}
fn set_current_time(&mut self, time: SystemTime) -> SystemTime {
if self.current_wf_time.map(|t| t < time).unwrap_or(true) {
self.current_wf_time = Some(time);
}
self.current_wf_time
.expect("We have just ensured this is populated")
}
pub(crate) fn iterate_machines(&mut self) -> Result<bool> {
let results = self.drive_me.fetch_workflow_iteration_output();
let jobs = self.handle_driven_results(results)?;
let has_new_lang_jobs = !jobs.is_empty();
for job in jobs.into_iter() {
self.drive_me.send_job(job);
}
self.prepare_commands()?;
Ok(has_new_lang_jobs)
}
pub(crate) fn apply_history_events(&mut self, history_info: &HistoryInfo) -> Result<()> {
let (_, events) = history_info
.events()
.split_at(self.get_last_started_event_id() as usize);
let mut history = events.iter().peekable();
self.set_started_ids(
history_info.previous_started_event_id,
history_info.workflow_task_started_event_id,
);
while let Some(event) = history.next() {
let next_event = history.peek();
if event.event_type == EventType::WorkflowTaskStarted as i32 && next_event.is_none() {
self.handle_event(event, false)?;
return Ok(());
}
self.handle_event(event, next_event.is_some())?;
if next_event.is_none() {
if event.is_final_wf_execution_event() {
return Ok(());
}
unreachable!()
}
}
Ok(())
}
fn submachine_handle_event(
&mut self,
sm: MachineKey,
event: &HistoryEvent,
has_next_event: bool,
) -> Result<()> {
let sm = self.all_machines.get_mut(sm).expect("Machine must exist");
let machine_responses = sm.handle_event(event, has_next_event).map_err(|e| {
if let WFMachinesError::MalformedEventDetail(s) = e {
WFMachinesError::MalformedEvent(event.clone(), s)
} else {
e
}
})?;
if !machine_responses.is_empty() {
debug!(responses = %machine_responses.display(),
"Machine produced responses");
}
for response in machine_responses {
match response {
MachineResponse::PushWFJob(a) => {
self.drive_me.send_job(a);
}
MachineResponse::TriggerWFTaskStarted {
task_started_event_id,
time,
} => {
self.task_started(task_started_event_id, time)?;
}
MachineResponse::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => {
self.drive_me
.send_job(wf_activation_job::Variant::UpdateRandomSeed(
UpdateRandomSeed {
randomness_seed: str_to_randomness_seed(&new_run_id),
},
));
}
MachineResponse::IssueNewCommand(_) => {
panic!("Issue new command machine response not expected here")
}
}
}
Ok(())
}
fn handle_driven_results(
&mut self,
results: Vec<WFCommand>,
) -> Result<Vec<wf_activation_job::Variant>> {
let mut jobs = vec![];
for cmd in results {
match cmd {
WFCommand::AddTimer(attrs) => {
let tid = attrs.timer_id.clone();
let timer = self.add_new_machine(new_timer(attrs));
self.id_to_machine
.insert(CommandID::Timer(tid), timer.machine);
self.current_wf_task_commands.push_back(timer);
}
WFCommand::CancelTimer(attrs) => self.process_cancellation(
&CommandID::Timer(attrs.timer_id.to_owned()),
&mut jobs,
)?,
WFCommand::AddActivity(attrs) => {
let aid = attrs.activity_id.clone();
let activity = self.add_new_machine(new_activity(attrs));
self.id_to_machine
.insert(CommandID::Activity(aid), activity.machine);
self.current_wf_task_commands.push_back(activity);
}
WFCommand::RequestCancelActivity(attrs) => self.process_cancellation(
&CommandID::Activity(attrs.activity_id.to_owned()),
&mut jobs,
)?,
WFCommand::CompleteWorkflow(attrs) => {
let cwfm = self.add_new_machine(complete_workflow(attrs));
self.current_wf_task_commands.push_back(cwfm);
}
WFCommand::FailWorkflow(attrs) => {
let cwfm = self.add_new_machine(fail_workflow(attrs));
self.current_wf_task_commands.push_back(cwfm);
}
WFCommand::NoCommandsFromLang => (),
}
}
Ok(jobs)
}
fn process_cancellation(&mut self, id: &CommandID, jobs: &mut Vec<Variant>) -> Result<()> {
let m_key = self.get_machine_key(id)?;
let res = self.machine_mut(m_key).cancel()?;
debug!(machine_responses = ?res, cmd_id = ?id, "Req cancel responses");
for r in res {
match r {
MachineResponse::IssueNewCommand(c) => {
self.current_wf_task_commands.push_back(CommandAndMachine {
command: c,
machine: m_key,
})
}
MachineResponse::PushWFJob(j) => {
jobs.push(j);
}
v => {
return Err(WFMachinesError::UnexpectedMachineResponse(
v,
format!("When cancelling {:?}", id),
));
}
}
}
Ok(())
}
fn get_machine_key(&mut self, id: &CommandID) -> Result<MachineKey> {
Ok(*self.id_to_machine.get(id).ok_or_else(|| {
WFMachinesError::MissingAssociatedMachine(format!(
"Missing associated machine for {:?}",
id
))
})?)
}
#[instrument(level = "debug", skip(self))]
fn prepare_commands(&mut self) -> Result<()> {
while let Some(c) = self.current_wf_task_commands.pop_front() {
let cmd_type = CommandType::from_i32(c.command.command_type)
.ok_or(WFMachinesError::UnknownCommandType(c.command.command_type))?;
if !self
.machine(c.machine)
.was_cancelled_before_sent_to_server()
{
self.machine_mut(c.machine).handle_command(cmd_type)?;
}
self.commands.push_back(c);
}
debug!(commands = %self.commands.display(), "prepared commands");
Ok(())
}
fn add_new_machine<T: TemporalStateMachine + 'static>(
&mut self,
machine: NewMachineWithCommand<T>,
) -> CommandAndMachine {
let k = self.all_machines.insert(Box::new(machine.machine));
CommandAndMachine {
command: machine.command,
machine: k,
}
}
fn machine(&self, m: MachineKey) -> &dyn TemporalStateMachine {
self.all_machines
.get(m)
.expect("Machine must exist")
.borrow()
}
fn machine_mut(&mut self, m: MachineKey) -> &mut (dyn TemporalStateMachine + 'static) {
self.all_machines
.get_mut(m)
.expect("Machine must exist")
.borrow_mut()
}
}
fn str_to_randomness_seed(run_id: &str) -> u64 {
let mut s = DefaultHasher::new();
run_id.hash(&mut s);
s.finish()
}