use crate::integ_tests::{
create_workflow, get_integ_core, get_integ_server_options,
simple_wf_tests::schedule_activity_and_timer_cmds,
};
use assert_matches::assert_matches;
use crossbeam::channel::{unbounded, RecvTimeoutError};
use rand::Rng;
use std::{sync::Arc, time::Duration};
use temporal_sdk_core::protos::coresdk::{
activity_task::activity_task as act_task,
workflow_activation::{wf_activation_job, FireTimer, WfActivationJob},
workflow_commands::{
ActivityCancellationType, CompleteWorkflowExecution, RequestCancelActivity,
},
workflow_completion::WfActivationCompletion,
};
use temporal_sdk_core::{tracing_init, Core, CoreInitOptions};
use tokio::time::timeout;
#[tokio::test]
async fn out_of_order_completion_doesnt_hang() {
tracing_init();
let mut rng = rand::thread_rng();
let task_q_salt: u32 = rng.gen();
let task_q = format!("activity_cancelled_workflow_{}", task_q_salt.to_string());
let core = Arc::new(get_integ_core(&task_q).await);
let workflow_id: u32 = rng.gen();
create_workflow(&*core, &task_q, &workflow_id.to_string(), None).await;
let activity_id: String = rng.gen::<u32>().to_string();
let timer_id: String = rng.gen::<u32>().to_string();
let task = core.poll_workflow_task().await.unwrap();
core.complete_workflow_task(schedule_activity_and_timer_cmds(
&task_q,
&activity_id,
&timer_id,
ActivityCancellationType::TryCancel,
task,
Duration::from_secs(60),
Duration::from_millis(50),
))
.await
.unwrap();
let activity_task = core.poll_activity_task().await.unwrap();
assert_matches!(
activity_task.variant,
Some(act_task::Variant::Start(start_activity)) => {
assert_eq!(start_activity.activity_type, "test_activity".to_string())
}
);
let task = core.poll_workflow_task().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[
WfActivationJob {
variant: Some(wf_activation_job::Variant::FireTimer(
FireTimer { timer_id: t_id }
)),
},
] => {
assert_eq!(t_id, &timer_id);
}
);
let cc = core.clone();
let jh = tokio::spawn(async move {
let task = timeout(Duration::from_secs(1), cc.poll_workflow_task())
.await
.expect("Poll should come back right away")
.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WfActivationJob {
variant: Some(wf_activation_job::Variant::ResolveActivity(_)),
}]
);
cc.complete_workflow_task(WfActivationCompletion::ok_from_cmds(
vec![CompleteWorkflowExecution { result: None }.into()],
task.task_token,
))
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
core.complete_workflow_task(WfActivationCompletion::ok_from_cmds(
vec![RequestCancelActivity {
activity_id,
..Default::default()
}
.into()],
task.task_token,
))
.await
.unwrap();
jh.await.unwrap();
}
#[tokio::test]
async fn long_poll_timeout_is_retried() {
let mut gateway_opts = get_integ_server_options("some_task_q");
gateway_opts.long_poll_timeout = Duration::from_secs(3);
let core = temporal_sdk_core::init(CoreInitOptions {
gateway_opts,
evict_after_pending_cleared: false,
max_outstanding_workflow_tasks: 1,
max_outstanding_activities: 1,
})
.await
.unwrap();
let (tx, rx) = unbounded();
tokio::spawn(async move {
core.poll_workflow_task().await.unwrap();
tx.send(())
});
let err = rx.recv_timeout(Duration::from_secs(4)).unwrap_err();
assert_matches!(err, RecvTimeoutError::Timeout);
}