use crate::machines::test_help::TestHistoryBuilder;
use crate::protos::temporal::api::common::v1::Payload;
use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause};
use crate::protos::temporal::api::failure::v1::Failure;
use crate::protos::temporal::api::history::v1::{
history_event, ActivityTaskCancelRequestedEventAttributes, ActivityTaskCanceledEventAttributes,
ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes,
ActivityTaskScheduledEventAttributes, ActivityTaskStartedEventAttributes,
ActivityTaskTimedOutEventAttributes, TimerCanceledEventAttributes, TimerFiredEventAttributes,
};
pub fn single_timer(timer_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None);
t.add(
EventType::TimerFired,
history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes {
started_event_id: timer_started_event_id,
timer_id: timer_id.to_string(),
}),
);
t.add_workflow_task_scheduled_and_started();
t
}
pub fn cancel_timer(wait_timer_id: &str, cancel_timer_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let cancel_timer_started_id = t.add_get_event_id(EventType::TimerStarted, None);
let wait_timer_started_id = t.add_get_event_id(EventType::TimerStarted, None);
t.add(
EventType::TimerFired,
history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes {
started_event_id: wait_timer_started_id,
timer_id: wait_timer_id.to_string(),
}),
);
t.add_full_wf_task();
t.add(
EventType::TimerCanceled,
history_event::Attributes::TimerCanceledEventAttributes(TimerCanceledEventAttributes {
started_event_id: cancel_timer_started_id,
timer_id: cancel_timer_id.to_string(),
..Default::default()
}),
);
t.add_workflow_execution_completed();
t
}
pub fn parallel_timer(timer1: &str, timer2: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None);
let timer_2_started_event_id = t.add_get_event_id(EventType::TimerStarted, None);
t.add(
EventType::TimerFired,
history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes {
started_event_id: timer_started_event_id,
timer_id: timer1.to_string(),
}),
);
t.add(
EventType::TimerFired,
history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes {
started_event_id: timer_2_started_event_id,
timer_id: timer2.to_string(),
}),
);
t.add_workflow_task_scheduled_and_started();
t
}
pub fn workflow_fails_with_reset_after_timer(
timer_id: &str,
original_run_id: &str,
) -> TestHistoryBuilder {
let mut t = single_timer(timer_id);
t.add_workflow_task_failed_new_id(WorkflowTaskFailedCause::ResetWorkflow, original_run_id);
t.add_workflow_task_scheduled_and_started();
t
}
pub fn workflow_fails_with_failure_after_timer(timer_id: &str) -> TestHistoryBuilder {
let mut t = single_timer(timer_id);
t.add_workflow_task_failed_with_failure(
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure,
Failure {
message: "boom".to_string(),
..Default::default()
},
);
t.add_workflow_task_scheduled_and_started();
t
}
pub fn workflow_fails_with_failure_two_different_points(
timer_1: &str,
timer_2: &str,
) -> TestHistoryBuilder {
let mut t = single_timer(timer_1);
t.add_workflow_task_failed_with_failure(
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure,
Failure {
message: "boom 1".to_string(),
..Default::default()
},
);
t.add_full_wf_task();
let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None);
t.add(
EventType::TimerFired,
history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes {
started_event_id: timer_started_event_id,
timer_id: timer_2.to_string(),
}),
);
t.add_workflow_task_scheduled_and_started();
t.add_workflow_task_failed_with_failure(
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure,
Failure {
message: "boom 2".to_string(),
..Default::default()
},
);
t.add_workflow_task_scheduled_and_started();
t
}
pub fn single_activity(activity_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
let started_event_id = t.add_get_event_id(
EventType::ActivityTaskStarted,
Some(
history_event::Attributes::ActivityTaskStartedEventAttributes(
ActivityTaskStartedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
),
);
t.add(
EventType::ActivityTaskCompleted,
history_event::Attributes::ActivityTaskCompletedEventAttributes(
ActivityTaskCompletedEventAttributes {
scheduled_event_id,
started_event_id,
..Default::default()
},
),
);
t.add_workflow_task_scheduled_and_started();
t
}
pub fn single_failed_activity(activity_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
let started_event_id = t.add_get_event_id(
EventType::ActivityTaskStarted,
Some(
history_event::Attributes::ActivityTaskStartedEventAttributes(
ActivityTaskStartedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
),
);
t.add(
EventType::ActivityTaskFailed,
history_event::Attributes::ActivityTaskFailedEventAttributes(
ActivityTaskFailedEventAttributes {
scheduled_event_id,
started_event_id,
..Default::default()
},
),
);
t.add_workflow_task_scheduled_and_started();
t
}
pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add(
EventType::ActivityTaskCancelRequested,
history_event::Attributes::ActivityTaskCancelRequestedEventAttributes(
ActivityTaskCancelRequestedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add_workflow_execution_completed();
t
}
pub fn scheduled_activity_timeout(activity_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
t.add(
EventType::ActivityTaskTimedOut,
history_event::Attributes::ActivityTaskTimedOutEventAttributes(
ActivityTaskTimedOutEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add_full_wf_task();
t.add_workflow_execution_completed();
t
}
pub fn scheduled_cancelled_activity_timeout(
activity_id: &str,
signal_id: &str,
) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add(
EventType::ActivityTaskCancelRequested,
history_event::Attributes::ActivityTaskCancelRequestedEventAttributes(
ActivityTaskCancelRequestedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add(
EventType::ActivityTaskTimedOut,
history_event::Attributes::ActivityTaskTimedOutEventAttributes(
ActivityTaskTimedOutEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add_full_wf_task();
t.add_workflow_execution_completed();
t
}
pub fn started_activity_timeout(activity_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
let started_event_id = t.add_get_event_id(
EventType::ActivityTaskStarted,
Some(
history_event::Attributes::ActivityTaskStartedEventAttributes(
ActivityTaskStartedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
),
);
t.add(
EventType::ActivityTaskTimedOut,
history_event::Attributes::ActivityTaskTimedOutEventAttributes(
ActivityTaskTimedOutEventAttributes {
scheduled_event_id,
started_event_id,
..Default::default()
},
),
);
t.add_full_wf_task();
t.add_workflow_execution_completed();
t
}
pub fn cancel_scheduled_activity_abandon(activity_id: &str, signal_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add_workflow_execution_completed();
t
}
pub fn cancel_started_activity_abandon(activity_id: &str, signal_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
t.add_get_event_id(
EventType::ActivityTaskStarted,
Some(
history_event::Attributes::ActivityTaskStartedEventAttributes(
ActivityTaskStartedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add_workflow_execution_completed();
t
}
pub fn cancel_scheduled_activity_with_signal_and_activity_task_cancel(
activity_id: &str,
signal_id: &str,
) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add(
EventType::ActivityTaskCancelRequested,
history_event::Attributes::ActivityTaskCancelRequestedEventAttributes(
ActivityTaskCancelRequestedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add(
EventType::ActivityTaskCanceled,
history_event::Attributes::ActivityTaskCanceledEventAttributes(
ActivityTaskCanceledEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add_full_wf_task();
t.add_workflow_execution_completed();
t
}
pub fn cancel_started_activity_with_signal_and_activity_task_cancel(
activity_id: &str,
signal_id: &str,
) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
t.add_get_event_id(
EventType::ActivityTaskStarted,
Some(
history_event::Attributes::ActivityTaskStartedEventAttributes(
ActivityTaskStartedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add(
EventType::ActivityTaskCancelRequested,
history_event::Attributes::ActivityTaskCancelRequestedEventAttributes(
ActivityTaskCancelRequestedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add(
EventType::ActivityTaskCanceled,
history_event::Attributes::ActivityTaskCanceledEventAttributes(
ActivityTaskCanceledEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add_full_wf_task();
t.add_workflow_execution_completed();
t
}
pub fn cancel_scheduled_activity_with_activity_task_cancel(
activity_id: &str,
signal_id: &str,
) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add(
EventType::ActivityTaskCancelRequested,
history_event::Attributes::ActivityTaskCancelRequestedEventAttributes(
ActivityTaskCancelRequestedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add(
EventType::ActivityTaskCanceled,
history_event::Attributes::ActivityTaskCanceledEventAttributes(
ActivityTaskCanceledEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add_full_wf_task();
t.add_workflow_execution_completed();
t
}
pub fn cancel_started_activity_with_activity_task_cancel(
activity_id: &str,
signal_id: &str,
) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let scheduled_event_id = t.add_get_event_id(
EventType::ActivityTaskScheduled,
Some(
history_event::Attributes::ActivityTaskScheduledEventAttributes(
ActivityTaskScheduledEventAttributes {
activity_id: activity_id.to_string(),
..Default::default()
},
),
),
);
t.add_get_event_id(
EventType::ActivityTaskStarted,
Some(
history_event::Attributes::ActivityTaskStartedEventAttributes(
ActivityTaskStartedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
),
);
t.add_we_signaled(
signal_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_full_wf_task();
t.add(
EventType::ActivityTaskCancelRequested,
history_event::Attributes::ActivityTaskCancelRequestedEventAttributes(
ActivityTaskCancelRequestedEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add(
EventType::ActivityTaskCanceled,
history_event::Attributes::ActivityTaskCanceledEventAttributes(
ActivityTaskCanceledEventAttributes {
scheduled_event_id,
..Default::default()
},
),
);
t.add_full_wf_task();
t.add_workflow_execution_completed();
t
}
pub fn two_signals(sig_1_id: &str, sig_2_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_we_signaled(
sig_1_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_we_signaled(
sig_2_id,
vec![Payload {
metadata: Default::default(),
data: b"world".to_vec(),
}],
);
t.add_workflow_task_scheduled_and_started();
t
}
pub fn long_sequential_timers(num_tasks: usize) -> TestHistoryBuilder {
assert!(num_tasks > 1);
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
for i in 1..num_tasks {
let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None);
t.add(
EventType::TimerFired,
history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes {
started_event_id: timer_started_event_id,
timer_id: format!("timer-{}", i),
}),
);
t.add_full_wf_task();
}
t.add_workflow_execution_completed();
t
}