mod workflow_machines;
#[allow(unused)]
mod activity_state_machine;
#[allow(unused)]
mod cancel_external_state_machine;
#[allow(unused)]
mod cancel_workflow_state_machine;
#[allow(unused)]
mod child_workflow_state_machine;
mod complete_workflow_state_machine;
#[allow(unused)]
mod continue_as_new_workflow_state_machine;
mod fail_workflow_state_machine;
#[allow(unused)]
mod local_activity_state_machine;
#[allow(unused)]
mod mutable_side_effect_state_machine;
#[allow(unused)]
mod side_effect_state_machine;
#[allow(unused)]
mod signal_external_state_machine;
mod timer_state_machine;
#[allow(unused)]
mod upsert_search_attributes_state_machine;
#[allow(unused)]
mod version_state_machine;
mod workflow_task_state_machine;
#[cfg(test)]
#[macro_use]
pub(crate) mod test_help;
pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines};
use crate::{
core_tracing::VecDisplayer,
machines::workflow_machines::MachineResponse,
protos::{
coresdk::workflow_commands::{
workflow_command, CancelTimer, CompleteWorkflowExecution, FailWorkflowExecution,
RequestCancelActivity, ScheduleActivity, StartTimer, WorkflowCommand,
},
temporal::api::{command::v1::Command, enums::v1::CommandType, history::v1::HistoryEvent},
},
};
use prost::alloc::fmt::Formatter;
use rustfsm::{MachineError, StateMachine};
use std::{
convert::{TryFrom, TryInto},
fmt::{Debug, Display},
};
#[cfg(test)]
use crate::machines::test_help::add_coverage;
pub(crate) type ProtoCommand = Command;
#[derive(Debug, derive_more::From)]
#[allow(clippy::large_enum_variant)]
pub enum WFCommand {
NoCommandsFromLang,
AddActivity(ScheduleActivity),
RequestCancelActivity(RequestCancelActivity),
AddTimer(StartTimer),
CancelTimer(CancelTimer),
CompleteWorkflow(CompleteWorkflowExecution),
FailWorkflow(FailWorkflowExecution),
}
#[derive(thiserror::Error, Debug, derive_more::From)]
#[error("Lang provided workflow command with empty variant")]
pub struct EmptyWorkflowCommandErr;
impl TryFrom<WorkflowCommand> for WFCommand {
type Error = EmptyWorkflowCommandErr;
fn try_from(c: WorkflowCommand) -> Result<Self, Self::Error> {
match c.variant.ok_or(EmptyWorkflowCommandErr)? {
workflow_command::Variant::StartTimer(s) => Ok(WFCommand::AddTimer(s)),
workflow_command::Variant::CancelTimer(s) => Ok(WFCommand::CancelTimer(s)),
workflow_command::Variant::ScheduleActivity(s) => Ok(WFCommand::AddActivity(s)),
workflow_command::Variant::RequestCancelActivity(s) => {
Ok(WFCommand::RequestCancelActivity(s))
}
workflow_command::Variant::CompleteWorkflowExecution(c) => {
Ok(WFCommand::CompleteWorkflow(c))
}
workflow_command::Variant::FailWorkflowExecution(s) => Ok(WFCommand::FailWorkflow(s)),
_ => unimplemented!(),
}
}
}
trait TemporalStateMachine: CheckStateMachineInFinal + Send {
fn name(&self) -> &str;
fn handle_command(&mut self, command_type: CommandType) -> Result<(), WFMachinesError>;
fn handle_event(
&mut self,
event: &HistoryEvent,
has_next_event: bool,
) -> Result<Vec<MachineResponse>, WFMachinesError>;
fn cancel(&mut self) -> Result<Vec<MachineResponse>, WFMachinesError>;
fn was_cancelled_before_sent_to_server(&self) -> bool;
}
impl<SM> TemporalStateMachine for SM
where
SM: StateMachine
+ CheckStateMachineInFinal
+ WFMachinesAdapter
+ Cancellable
+ OnEventWrapper
+ Clone
+ Send,
<SM as StateMachine>::Event: TryFrom<HistoryEvent>,
<SM as StateMachine>::Event: TryFrom<CommandType>,
<SM as StateMachine>::Event: Display,
WFMachinesError: From<<<SM as StateMachine>::Event as TryFrom<HistoryEvent>>::Error>,
<SM as StateMachine>::Command: Debug + Display,
<SM as StateMachine>::State: Display,
<SM as StateMachine>::Error: Into<WFMachinesError> + 'static + Send + Sync,
{
fn name(&self) -> &str {
<Self as StateMachine>::name(self)
}
fn handle_command(&mut self, command_type: CommandType) -> Result<(), WFMachinesError> {
debug!(
command_type = ?command_type,
machine_name = %self.name(),
state = %self.state(),
"handling command"
);
if let Ok(converted_command) = command_type.try_into() {
match OnEventWrapper::on_event_mut(self, converted_command) {
Ok(_c) => Ok(()),
Err(MachineError::InvalidTransition) => {
Err(WFMachinesError::UnexpectedCommand(command_type))
}
Err(MachineError::Underlying(e)) => Err(e.into()),
}
} else {
Err(WFMachinesError::UnexpectedCommand(command_type))
}
}
fn handle_event(
&mut self,
event: &HistoryEvent,
has_next_event: bool,
) -> Result<Vec<MachineResponse>, WFMachinesError> {
debug!(
event = %event,
machine_name = %self.name(),
state = %self.state(),
"handling event"
);
let converted_event: <Self as StateMachine>::Event = event.clone().try_into()?;
match OnEventWrapper::on_event_mut(self, converted_event) {
Ok(c) => {
if !c.is_empty() {
debug!(commands = %c.display(), state = %self.state(),
"Machine produced commands");
}
let mut machine_responses = vec![];
for cmd in c {
machine_responses.extend(self.adapt_response(event, has_next_event, cmd)?);
}
Ok(machine_responses)
}
Err(MachineError::InvalidTransition) => {
Err(WFMachinesError::InvalidTransitionDuringEvent(
event.clone(),
format!(
"{} in state {} says the transition is invalid",
self.name(),
self.state()
),
))
}
Err(MachineError::Underlying(e)) => Err(e.into()),
}
}
fn cancel(&mut self) -> Result<Vec<MachineResponse>, WFMachinesError> {
let res = self.cancel();
res.map_err(|e| match e {
MachineError::InvalidTransition => {
WFMachinesError::InvalidTransition("while attempting to cancel")
}
MachineError::Underlying(e) => e.into(),
})
}
fn was_cancelled_before_sent_to_server(&self) -> bool {
self.was_cancelled_before_sent_to_server()
}
}
trait CheckStateMachineInFinal {
fn is_final_state(&self) -> bool;
}
impl<SM> CheckStateMachineInFinal for SM
where
SM: StateMachine,
{
fn is_final_state(&self) -> bool {
self.on_final_state()
}
}
trait WFMachinesAdapter: StateMachine {
fn adapt_response(
&self,
event: &HistoryEvent,
has_next_event: bool,
my_command: Self::Command,
) -> Result<Vec<MachineResponse>, WFMachinesError>;
}
trait Cancellable: StateMachine {
fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<Self::Error>> {
panic!("Machine {} cannot be cancelled", self.name())
}
fn was_cancelled_before_sent_to_server(&self) -> bool {
false
}
}
pub(crate) trait OnEventWrapper: StateMachine
where
<Self as StateMachine>::State: Display,
<Self as StateMachine>::Event: Display,
Self: Clone,
{
fn on_event_mut(
&mut self,
event: Self::Event,
) -> Result<Vec<Self::Command>, MachineError<Self::Error>> {
#[cfg(test)]
let from_state = self.state().to_string();
#[cfg(test)]
let converted_event_str = event.to_string();
let res = StateMachine::on_event_mut(self, event);
if res.is_ok() {
#[cfg(test)]
add_coverage(
self.name().to_owned(),
from_state,
self.state().to_string(),
converted_event_str,
);
}
res
}
}
impl<SM> OnEventWrapper for SM
where
SM: StateMachine,
<Self as StateMachine>::State: Display,
<Self as StateMachine>::Event: Display,
Self: Clone,
{
}
#[derive(Debug)]
struct NewMachineWithCommand<T: TemporalStateMachine> {
command: ProtoCommand,
machine: T,
}
impl Debug for dyn TemporalStateMachine {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.name())
}
}