#![allow(clippy::large_enum_variant)]
use crate::{
machines::{
workflow_machines::{MachineResponse, WFMachinesError},
Cancellable, NewMachineWithCommand, OnEventWrapper, WFMachinesAdapter,
},
protos::{
coresdk::{
workflow_activation::FireTimer,
workflow_commands::{CancelTimer, StartTimer},
HistoryEventId,
},
temporal::api::{
command::v1::Command,
enums::v1::{CommandType, EventType},
history::v1::{history_event, HistoryEvent, TimerFiredEventAttributes},
},
},
};
use rustfsm::{fsm, MachineError, StateMachine, TransitionResult};
use std::convert::TryFrom;
fsm! {
pub(super) name TimerMachine;
command TimerMachineCommand;
error WFMachinesError;
shared_state SharedState;
Created --(Schedule, on_schedule) --> StartCommandCreated;
StartCommandCreated --(CommandStartTimer) --> StartCommandCreated;
StartCommandCreated --(TimerStarted(HistoryEventId), on_timer_started) --> StartCommandRecorded;
StartCommandCreated --(Cancel, shared on_cancel) --> Canceled;
StartCommandRecorded --(TimerFired(TimerFiredEventAttributes), shared on_timer_fired) --> Fired;
StartCommandRecorded --(Cancel, shared on_cancel) --> CancelTimerCommandCreated;
CancelTimerCommandCreated --(Cancel) --> CancelTimerCommandCreated;
CancelTimerCommandCreated
--(CommandCancelTimer, on_command_cancel_timer) --> CancelTimerCommandSent;
CancelTimerCommandSent --(TimerCanceled) --> Canceled;
}
#[derive(Debug, derive_more::Display)]
pub(super) enum TimerMachineCommand {
Complete,
Canceled,
IssueCancelCmd(Command),
}
#[derive(Default, Clone)]
pub(super) struct SharedState {
attrs: StartTimer,
cancelled_before_sent: bool,
}
pub(super) fn new_timer(attribs: StartTimer) -> NewMachineWithCommand<TimerMachine> {
let (timer, add_cmd) = TimerMachine::new_scheduled(attribs);
NewMachineWithCommand {
command: add_cmd,
machine: timer,
}
}
impl TimerMachine {
pub(crate) fn new_scheduled(attribs: StartTimer) -> (Self, Command) {
let mut s = Self::new(attribs);
OnEventWrapper::on_event_mut(&mut s, TimerMachineEvents::Schedule)
.expect("Scheduling timers doesn't fail");
let cmd = Command {
command_type: CommandType::StartTimer as i32,
attributes: Some(s.shared_state().attrs.clone().into()),
};
(s, cmd)
}
fn new(attribs: StartTimer) -> Self {
Self {
state: Created {}.into(),
shared_state: SharedState {
attrs: attribs,
cancelled_before_sent: false,
},
}
}
}
impl TryFrom<HistoryEvent> for TimerMachineEvents {
type Error = WFMachinesError;
fn try_from(e: HistoryEvent) -> Result<Self, Self::Error> {
Ok(match EventType::from_i32(e.event_type) {
Some(EventType::TimerStarted) => Self::TimerStarted(e.event_id),
Some(EventType::TimerCanceled) => Self::TimerCanceled,
Some(EventType::TimerFired) => {
if let Some(history_event::Attributes::TimerFiredEventAttributes(attrs)) =
e.attributes
{
Self::TimerFired(attrs)
} else {
return Err(WFMachinesError::MalformedEvent(
e,
"Timer fired attribs were unset".to_string(),
));
}
}
_ => {
return Err(WFMachinesError::UnexpectedEvent(
e,
"Timer machine does not handle this event",
))
}
})
}
}
impl TryFrom<CommandType> for TimerMachineEvents {
type Error = ();
fn try_from(c: CommandType) -> Result<Self, Self::Error> {
Ok(match c {
CommandType::StartTimer => Self::CommandStartTimer,
CommandType::CancelTimer => Self::CommandCancelTimer,
_ => return Err(()),
})
}
}
#[derive(Default, Clone)]
pub(super) struct Created {}
impl Created {
pub(super) fn on_schedule(self) -> TimerMachineTransition<StartCommandCreated> {
TransitionResult::default()
}
}
#[derive(Default, Clone)]
pub(super) struct CancelTimerCommandCreated {}
impl CancelTimerCommandCreated {
pub(super) fn on_command_cancel_timer(self) -> TimerMachineTransition<CancelTimerCommandSent> {
TransitionResult::ok(
vec![TimerMachineCommand::Canceled],
CancelTimerCommandSent::default(),
)
}
}
#[derive(Default, Clone)]
pub(super) struct CancelTimerCommandSent {}
#[derive(Default, Clone)]
pub(super) struct Canceled {}
impl From<CancelTimerCommandSent> for Canceled {
fn from(_: CancelTimerCommandSent) -> Self {
Self::default()
}
}
#[derive(Default, Clone)]
pub(super) struct Fired {}
#[derive(Default, Clone)]
pub(super) struct StartCommandCreated {}
impl StartCommandCreated {
pub(super) fn on_timer_started(
self,
_id: HistoryEventId,
) -> TimerMachineTransition<StartCommandRecorded> {
TransitionResult::default()
}
pub(super) fn on_cancel(self, dat: SharedState) -> TimerMachineTransition<Canceled> {
TransitionResult::ok_shared(
vec![TimerMachineCommand::Canceled],
Canceled::default(),
SharedState {
cancelled_before_sent: true,
..dat
},
)
}
}
#[derive(Default, Clone)]
pub(super) struct StartCommandRecorded {}
impl StartCommandRecorded {
pub(super) fn on_timer_fired(
self,
dat: SharedState,
attrs: TimerFiredEventAttributes,
) -> TimerMachineTransition<Fired> {
if dat.attrs.timer_id != attrs.timer_id {
TransitionResult::Err(WFMachinesError::MalformedEventDetail(format!(
"Timer fired event did not have expected timer id {}!",
dat.attrs.timer_id
)))
} else {
TransitionResult::ok(vec![TimerMachineCommand::Complete], Fired::default())
}
}
pub(super) fn on_cancel(
self,
dat: SharedState,
) -> TimerMachineTransition<CancelTimerCommandCreated> {
let cmd = Command {
command_type: CommandType::CancelTimer as i32,
attributes: Some(
CancelTimer {
timer_id: dat.attrs.timer_id,
}
.into(),
),
};
TransitionResult::ok(
vec![TimerMachineCommand::IssueCancelCmd(cmd)],
CancelTimerCommandCreated::default(),
)
}
}
impl WFMachinesAdapter for TimerMachine {
fn adapt_response(
&self,
_event: &HistoryEvent,
_has_next_event: bool,
my_command: TimerMachineCommand,
) -> Result<Vec<MachineResponse>, WFMachinesError> {
Ok(match my_command {
TimerMachineCommand::Complete => vec![FireTimer {
timer_id: self.shared_state.attrs.timer_id.clone(),
}
.into()],
TimerMachineCommand::Canceled => vec![],
TimerMachineCommand::IssueCancelCmd(c) => vec![MachineResponse::IssueNewCommand(c)],
})
}
}
impl Cancellable for TimerMachine {
fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<Self::Error>> {
Ok(
match OnEventWrapper::on_event_mut(self, TimerMachineEvents::Cancel)?.pop() {
Some(TimerMachineCommand::IssueCancelCmd(cmd)) => {
vec![MachineResponse::IssueNewCommand(cmd)]
}
Some(TimerMachineCommand::Canceled) => vec![],
x => panic!("Invalid cancel event response {:?}", x),
},
)
}
fn was_cancelled_before_sent_to_server(&self) -> bool {
self.shared_state().cancelled_before_sent
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
machines::{
test_help::{CommandSender, TestHistoryBuilder, TestWorkflowDriver},
workflow_machines::WorkflowMachines,
},
protos::coresdk::workflow_commands::CompleteWorkflowExecution,
test_help::canned_histories,
};
use rstest::{fixture, rstest};
use std::time::Duration;
#[fixture]
fn fire_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) {
let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move {
let timer = StartTimer {
timer_id: "timer1".to_string(),
start_to_fire_timeout: Some(Duration::from_secs(5).into()),
};
command_sink.timer(timer).await;
let complete = CompleteWorkflowExecution::default();
command_sink.send(complete.into());
});
let t = canned_histories::single_timer("timer1");
let state_machines = WorkflowMachines::new(
"wfid".to_string(),
"runid".to_string(),
Box::new(twd).into(),
);
assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap());
(t, state_machines)
}
#[rstest]
fn test_fire_happy_path_inc(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) {
let (t, mut state_machines) = fire_happy_hist;
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, Some(1))
.unwrap();
state_machines.get_wf_activation();
assert_eq!(commands.len(), 1);
assert_eq!(commands[0].command_type, CommandType::StartTimer as i32);
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, Some(2))
.unwrap();
state_machines.get_wf_activation();
assert_eq!(commands.len(), 1);
assert_eq!(
commands[0].command_type,
CommandType::CompleteWorkflowExecution as i32
);
}
#[rstest]
fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) {
let (t, mut state_machines) = fire_happy_hist;
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, None)
.unwrap();
assert_eq!(commands.len(), 1);
assert_eq!(
commands[0].command_type,
CommandType::CompleteWorkflowExecution as i32
);
}
#[test]
fn mismatched_timer_ids_errors() {
let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move {
let timer = StartTimer {
timer_id: "realid".to_string(),
start_to_fire_timeout: Some(Duration::from_secs(5).into()),
};
command_sink.timer(timer).await;
});
let t = canned_histories::single_timer("badid");
let mut state_machines = WorkflowMachines::new(
"wfid".to_string(),
"runid".to_string(),
Box::new(twd).into(),
);
assert!(t
.handle_workflow_task_take_cmds(&mut state_machines, None)
.unwrap_err()
.to_string()
.contains("Timer fired event did not have expected timer id realid!"))
}
#[fixture]
fn cancellation_setup() -> (TestHistoryBuilder, WorkflowMachines) {
let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move {
let cancel_timer_fut = cmd_sink.timer(StartTimer {
timer_id: "cancel_timer".to_string(),
start_to_fire_timeout: Some(Duration::from_secs(500).into()),
});
cmd_sink
.timer(StartTimer {
timer_id: "wait_timer".to_string(),
start_to_fire_timeout: Some(Duration::from_secs(5).into()),
})
.await;
cmd_sink.cancel_timer("cancel_timer");
cancel_timer_fut.await;
let complete = CompleteWorkflowExecution::default();
cmd_sink.send(complete.into());
});
let t = canned_histories::cancel_timer("wait_timer", "cancel_timer");
let state_machines = WorkflowMachines::new(
"wfid".to_string(),
"runid".to_string(),
Box::new(twd).into(),
);
(t, state_machines)
}
#[rstest]
fn incremental_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) {
let (t, mut state_machines) = cancellation_setup;
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, Some(1))
.unwrap();
assert_eq!(commands.len(), 2);
assert_eq!(commands[0].command_type, CommandType::StartTimer as i32);
assert_eq!(commands[1].command_type, CommandType::StartTimer as i32);
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, Some(2))
.unwrap();
assert_eq!(commands.len(), 2);
assert_eq!(commands[0].command_type, CommandType::CancelTimer as i32);
assert_eq!(
commands[1].command_type,
CommandType::CompleteWorkflowExecution as i32
);
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, None)
.unwrap();
assert_eq!(commands.len(), 0);
}
#[rstest]
fn full_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) {
let (t, mut state_machines) = cancellation_setup;
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, None)
.unwrap();
assert_eq!(commands.len(), 0);
}
#[test]
fn cancel_before_sent_to_server() {
let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move {
let cancel_timer_fut = cmd_sink.timer(StartTimer {
timer_id: "cancel_timer".to_string(),
start_to_fire_timeout: Some(Duration::from_secs(500).into()),
});
cmd_sink.cancel_timer("cancel_timer");
cancel_timer_fut.await;
let complete = CompleteWorkflowExecution::default();
cmd_sink.send(complete.into());
});
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_workflow_execution_completed();
let mut state_machines = WorkflowMachines::new(
"wfid".to_string(),
"runid".to_string(),
Box::new(twd).into(),
);
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, None)
.unwrap();
assert_eq!(commands.len(), 0);
}
}