use crate::common::{CoreWfStarter, activity_functions::StdActivities};
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering::Relaxed},
},
time::Duration,
};
use temporalio_client::{
WorkflowFetchHistoryOptions, WorkflowStartOptions, WorkflowTerminateOptions,
};
use temporalio_common::{
UntypedWorkflow,
protos::temporal::api::{
enums::v1::{EventType, WorkflowTaskFailedCause::GrpcMessageTooLarge},
history::v1::history_event::Attributes::{
WorkflowExecutionTerminatedEventAttributes, WorkflowTaskFailedEventAttributes,
},
},
worker::WorkerTaskTypes,
};
use temporalio_macros::{workflow, workflow_methods};
use temporalio_sdk::{ActivityOptions, WorkflowContext, WorkflowResult, WorkflowTermination};
pub(crate) mod priority;
#[workflow]
struct OversizeGrpcMessageWf {
run_flag: Arc<AtomicBool>,
}
#[workflow_methods(factory_only)]
impl OversizeGrpcMessageWf {
#[run]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<Vec<u8>> {
if ctx.state(|wf| wf.run_flag.load(Relaxed)) {
Ok(vec![])
} else {
ctx.state(|wf| wf.run_flag.store(true, Relaxed));
let result: Vec<u8> = vec![0; 5000000];
Ok(result)
}
}
}
pub(crate) async fn grpc_message_too_large() {
let run_flag = Arc::new(AtomicBool::new(false));
let run_flag_clone = run_flag.clone();
let wf_name = "oversize_grpc_message";
let mut starter = CoreWfStarter::new_cloud_or_local(wf_name, "")
.await
.unwrap();
starter.sdk_config.task_types = WorkerTaskTypes::workflow_only();
starter
.sdk_config
.register_workflow_with_factory(move || OversizeGrpcMessageWf {
run_flag: run_flag_clone.clone(),
});
let mut sdk = starter.worker().await;
sdk.submit_workflow(
OversizeGrpcMessageWf::run,
(),
starter.workflow_options.clone(),
)
.await
.unwrap();
sdk.run_until_done().await.unwrap();
let events = starter.get_history().await.events;
assert!(
events.iter().any(is_oversize_grpc_event),
"Expected workflow task failure or termination b/c grpc message too large: {events:?}",
);
}
pub(crate) fn is_oversize_grpc_event(
e: &temporalio_common::protos::temporal::api::history::v1::HistoryEvent,
) -> bool {
e.event_type == EventType::WorkflowTaskFailed as i32
&& if let WorkflowTaskFailedEventAttributes(attr) = e.attributes.as_ref().unwrap() {
attr.cause == GrpcMessageTooLarge as i32
&& attr.failure.as_ref().unwrap().message == "GRPC Message too large"
} else {
false
}
||
e.event_type == EventType::WorkflowExecutionTerminated as i32
&& if let WorkflowExecutionTerminatedEventAttributes(attr) = e.attributes.as_ref().unwrap() {
attr.reason == "GrpcMessageTooLarge"
} else {
false
}
}
#[workflow]
#[derive(Default)]
struct ShutdownTimerActivityLoopWf;
#[workflow_methods]
impl ShutdownTimerActivityLoopWf {
#[run]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
loop {
ctx.timer(Duration::from_millis(10)).await;
ctx.start_activity(
StdActivities::no_op,
(),
ActivityOptions::start_to_close_timeout(Duration::from_secs(10)),
)
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
}
}
}
pub(crate) async fn shutdown_during_active_timer_activity_workflows() {
let wf_name = "shutdown_during_active_timer_activity_workflows";
let num_workflows = 10;
let mut starter =
if let Some(wfs) = CoreWfStarter::new_cloud_or_local(wf_name, ">=1.6.3-serverless").await {
wfs
} else {
return;
};
starter.sdk_config.register_activities(StdActivities);
let mut worker = starter.worker().await;
worker.register_workflow::<ShutdownTimerActivityLoopWf>();
let core = worker.core_worker();
core.validate().await.unwrap();
assert!(
core.get_namespace_capabilities().graceful_poll_shutdown(),
"Server must support graceful poll shutdown for this test"
);
let task_queue = starter.get_task_queue().to_owned();
let mut wf_ids = Vec::with_capacity(num_workflows);
for i in 0..num_workflows {
let wf_id = format!("{task_queue}-{i}");
worker
.submit_workflow(
ShutdownTimerActivityLoopWf::run,
(),
WorkflowStartOptions::new(task_queue.clone(), wf_id.clone()).build(),
)
.await
.unwrap();
wf_ids.push(wf_id);
}
worker.fetch_results = false;
let shutdown_handle = worker.inner_mut().shutdown_handle();
let run_fut = async { worker.run_until_done().await.unwrap() };
let shutdown_fut = async {
tokio::time::sleep(Duration::from_secs(2)).await;
shutdown_handle();
};
let shutdown_start = std::time::Instant::now();
tokio::join!(run_fut, shutdown_fut);
let shutdown_elapsed = shutdown_start.elapsed();
assert!(
shutdown_elapsed < Duration::from_secs(5),
"Worker shutdown took {shutdown_elapsed:?}, expected < 5s"
);
let client = starter.get_client().await;
for wf_id in &wf_ids {
client
.get_workflow_handle::<UntypedWorkflow>(wf_id)
.terminate(WorkflowTerminateOptions::default())
.await
.unwrap();
let history = client
.get_workflow_handle::<UntypedWorkflow>(wf_id)
.fetch_history(WorkflowFetchHistoryOptions::default())
.await
.unwrap();
let bad_events: Vec<_> = history
.events()
.iter()
.filter(|e| {
e.event_type() == EventType::WorkflowTaskFailed
|| e.event_type() == EventType::WorkflowTaskTimedOut
})
.collect();
assert!(
bad_events.is_empty(),
"Workflow {wf_id} had unexpected WFT failures/timeouts: {bad_events:?}"
);
}
}