use futures_util::{StreamExt, stream::FuturesUnordered};
use std::{collections::HashSet, future::Future};
use temporalio_common::protos::coresdk::{
workflow_activation::workflow_activation_job,
workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion},
};
pub async fn fanout_tasks<FutureMaker, Fut>(num: usize, fm: FutureMaker)
where
FutureMaker: Fn(usize) -> Fut,
Fut: Future,
{
let mut tasks = FuturesUnordered::new();
for i in 0..num {
tasks.push(fm(i));
}
while tasks.next().await.is_some() {}
}
#[macro_export]
macro_rules! job_assert {
($($pat:pat),+) => {
|res| {
assert_matches!(
res.jobs.as_slice(),
[$(WorkflowActivationJob {
variant: Some($pat),
}),+]
);
}
};
}
type AsserterWithReply<'a> = (
&'a dyn Fn(&temporalio_common::protos::coresdk::workflow_activation::WorkflowActivation),
workflow_activation_completion::Status,
);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum WorkflowCachingPolicy {
NonSticky,
AfterEveryReply,
}
pub(crate) async fn poll_and_reply<'a>(
worker: &'a crate::Worker,
eviction_mode: WorkflowCachingPolicy,
expect_and_reply: &'a [AsserterWithReply<'a>],
) {
poll_and_reply_clears_outstanding_evicts(worker, None, eviction_mode, expect_and_reply).await;
}
use crate::{pollers::MockPoller, test_help::OutstandingWFTMap};
pub(crate) async fn poll_and_reply_clears_outstanding_evicts<'a>(
worker: &'a crate::Worker,
outstanding_map: Option<OutstandingWFTMap>,
eviction_mode: WorkflowCachingPolicy,
expect_and_reply: &'a [AsserterWithReply<'a>],
) {
let mut evictions = 0;
let expected_evictions = expect_and_reply.len() - 1;
let mut executed_failures = HashSet::new();
let expected_fail_count = expect_and_reply
.iter()
.filter(|(_, reply)| !reply.is_success())
.count();
'outer: loop {
let expect_iter = expect_and_reply.iter();
for (i, interaction) in expect_iter.enumerate() {
let (asserter, reply) = interaction;
let complete_is_failure = !reply.is_success();
if executed_failures.contains(&i) {
continue;
}
let mut res = worker.poll_workflow_activation().await.unwrap();
if res.jobs.iter().any(|j| {
matches!(
j.variant,
Some(workflow_activation_job::Variant::RemoveFromCache(_))
)
}) && res.jobs.len() > 1
{
panic!("Saw an activation with an eviction & other work! {res:?}");
}
let is_eviction = res.is_only_eviction();
let mut do_release = false;
if is_eviction {
res.jobs.clear();
do_release = true;
}
if !res.jobs.is_empty() {
asserter(&res);
}
let reply = if res.jobs.is_empty() {
WorkflowActivationCompletion::empty(res.run_id.clone())
} else {
WorkflowActivationCompletion {
run_id: res.run_id.clone(),
status: Some(reply.clone()),
}
};
let ends_execution = reply.has_execution_ending();
worker.complete_workflow_activation(reply).await.unwrap();
if do_release && let Some(omap) = outstanding_map.as_ref() {
omap.release_run(&res.run_id);
}
if is_eviction && !ends_execution {
continue 'outer;
}
if complete_is_failure {
executed_failures.insert(i);
}
match eviction_mode {
WorkflowCachingPolicy::NonSticky => (),
WorkflowCachingPolicy::AfterEveryReply => {
if evictions < expected_evictions {
worker.request_workflow_eviction(&res.run_id);
evictions += 1;
}
}
}
}
break;
}
assert_eq!(expected_fail_count, executed_failures.len());
assert_eq!(worker.outstanding_workflow_tasks().await, 0);
}
pub(crate) fn gen_assert_and_reply(
asserter: &dyn Fn(&temporalio_common::protos::coresdk::workflow_activation::WorkflowActivation),
reply_commands: Vec<
temporalio_common::protos::coresdk::workflow_commands::workflow_command::Variant,
>,
) -> AsserterWithReply<'_> {
(
asserter,
temporalio_common::protos::coresdk::workflow_completion::Success::from_variants(
reply_commands,
)
.into(),
)
}
pub(crate) fn gen_assert_and_fail(
asserter: &dyn Fn(&temporalio_common::protos::coresdk::workflow_activation::WorkflowActivation),
) -> AsserterWithReply<'_> {
(
asserter,
temporalio_common::protos::coresdk::workflow_completion::Failure {
failure: Some(
temporalio_common::protos::temporal::api::failure::v1::Failure {
message: "Intentional test failure".to_string(),
..Default::default()
},
),
..Default::default()
}
.into(),
)
}
pub(crate) fn mock_poller<T>() -> MockPoller<T>
where
T: Send + Sync + 'static,
{
let mut mock_poller = MockPoller::new();
mock_poller.expect_shutdown_box().return_const(());
mock_poller.expect_notify_shutdown().return_const(());
mock_poller
}