1#![warn(missing_docs)] #![allow(clippy::upper_case_acronyms)]
3
4#[cfg(test)]
8#[macro_use]
9pub extern crate assert_matches;
10#[macro_use]
11extern crate tracing;
12
13pub mod protos;
14
15mod activity;
16pub(crate) mod core_tracing;
17mod errors;
18mod machines;
19mod pending_activations;
20mod pollers;
21mod protosext;
22mod workflow;
23
24#[cfg(test)]
25mod test_help;
26
27pub use crate::errors::{
28 CompleteActivityError, CompleteWfError, CoreInitError, PollActivityError, PollWfError,
29};
30pub use core_tracing::tracing_init;
31pub use pollers::{PollTaskRequest, ServerGateway, ServerGatewayApis, ServerGatewayOptions};
32pub use url::Url;
33
34use crate::{
35 activity::{ActivityHeartbeatManager, ActivityHeartbeatManagerHandle},
36 errors::{ActivityHeartbeatError, ShutdownErr, WorkflowUpdateError},
37 machines::{EmptyWorkflowCommandErr, WFCommand},
38 pending_activations::PendingActivations,
39 pollers::PollWorkflowTaskBuffer,
40 protos::{
41 coresdk::{
42 activity_result::{self as ar, activity_result},
43 activity_task::ActivityTask,
44 workflow_activation::{create_evict_activation, WfActivation},
45 workflow_completion::{self, wf_activation_completion, WfActivationCompletion},
46 ActivityHeartbeat, ActivityTaskCompletion,
47 },
48 temporal::api::{
49 enums::v1::WorkflowTaskFailedCause, workflowservice::v1::PollWorkflowTaskQueueResponse,
50 },
51 },
52 protosext::fmt_task_token,
53 workflow::{
54 NextWfActivation, PushCommandsResult, WorkflowConcurrencyManager, WorkflowError,
55 WorkflowManager,
56 },
57};
58use dashmap::{DashMap, DashSet};
59use futures::TryFutureExt;
60use std::{
61 convert::TryInto,
62 fmt::Debug,
63 future::Future,
64 sync::{
65 atomic::{AtomicBool, Ordering},
66 Arc,
67 },
68};
69use tokio::sync::Notify;
70use tracing::Span;
71
72#[async_trait::async_trait]
76pub trait Core: Send + Sync {
77 async fn poll_workflow_task(&self) -> Result<WfActivation, PollWfError>;
83
84 async fn poll_activity_task(&self) -> Result<ActivityTask, PollActivityError>;
90
91 async fn complete_workflow_task(
93 &self,
94 completion: WfActivationCompletion,
95 ) -> Result<(), CompleteWfError>;
96
97 async fn complete_activity_task(
99 &self,
100 completion: ActivityTaskCompletion,
101 ) -> Result<(), CompleteActivityError>;
102
103 async fn record_activity_heartbeat(
115 &self,
116 details: ActivityHeartbeat,
117 ) -> Result<(), ActivityHeartbeatError>;
118
119 fn server_gateway(&self) -> Arc<dyn ServerGatewayApis>;
121
122 async fn shutdown(&self);
129}
130
131pub struct CoreInitOptions {
133 pub gateway_opts: ServerGatewayOptions,
135 pub evict_after_pending_cleared: bool,
139 pub max_outstanding_workflow_tasks: usize,
143 pub max_outstanding_activities: usize,
145}
146
147pub async fn init(opts: CoreInitOptions) -> Result<impl Core, CoreInitError> {
155 let work_provider = opts.gateway_opts.connect().await?;
157
158 Ok(CoreSDK::new(work_provider, opts))
159}
160
161struct CoreSDK<WP> {
162 init_options: CoreInitOptions,
164 server_gateway: Arc<WP>,
166 workflow_machines: WorkflowConcurrencyManager,
168 workflow_task_tokens: DashMap<Vec<u8>, String>,
171 workflows_last_task_failed: DashSet<String>,
173 outstanding_workflow_tasks: DashSet<Vec<u8>>,
177
178 wf_task_poll_buffer: PollWorkflowTaskBuffer,
181
182 pending_activations: PendingActivations,
185
186 activity_heartbeat_manager_handle: ActivityHeartbeatManagerHandle,
187 outstanding_activity_tasks: DashSet<Vec<u8>>,
189 shutdown_requested: AtomicBool,
191 shutdown_notify: Notify,
193 workflow_task_complete_notify: Notify,
195 activity_task_complete_notify: Notify,
197}
198
199#[async_trait::async_trait]
200impl<WP> Core for CoreSDK<WP>
201where
202 WP: ServerGatewayApis + Send + Sync + 'static,
203{
204 #[instrument(skip(self))]
205 async fn poll_workflow_task(&self) -> Result<WfActivation, PollWfError> {
206 loop {
209 if let Some(pa) = self.pending_activations.pop() {
213 return Ok(pa);
214 }
215
216 if self.shutdown_requested.load(Ordering::SeqCst) {
217 return Err(PollWfError::ShutDown);
218 }
219
220 if self.outstanding_workflow_tasks.len()
222 >= self.init_options.max_outstanding_workflow_tasks
223 {
224 self.workflow_task_complete_notify.notified().await;
225 continue;
226 }
227
228 let task_complete_fut = self.workflow_task_complete_notify.notified();
229 let poll_result_future = self.shutdownable_fut(
230 self.wf_task_poll_buffer
231 .poll_workflow_task()
232 .map_err(Into::into),
233 );
234
235 debug!("Polling server");
236
237 let selected_f = tokio::select! {
238 _ = task_complete_fut => {
241 continue;
242 }
243 r = poll_result_future => r
244 };
245
246 match selected_f {
247 Ok(work) => {
248 if !work.next_page_token.is_empty() {
249 unimplemented!("History pagination not yet implemented");
251 }
252 if let Some(activation) = self.prepare_new_activation(work)? {
253 self.outstanding_workflow_tasks
254 .insert(activation.task_token.clone());
255 return Ok(activation);
256 }
257 }
258 Err(PollWfError::ShutDown) => continue,
260 Err(e) => return Err(e),
261 }
262 }
263 }
264
265 #[instrument(skip(self))]
266 async fn poll_activity_task(&self) -> Result<ActivityTask, PollActivityError> {
267 if self.shutdown_requested.load(Ordering::SeqCst) {
268 return Err(PollActivityError::ShutDown);
269 }
270
271 while self.outstanding_activity_tasks.len() >= self.init_options.max_outstanding_activities
272 {
273 self.activity_task_complete_notify.notified().await
274 }
275
276 match self
277 .shutdownable_fut(self.server_gateway.poll_activity_task().map_err(Into::into))
278 .await
279 {
280 Ok(work) => {
281 let task_token = work.task_token.clone();
282 self.outstanding_activity_tasks.insert(task_token.clone());
283 Ok(ActivityTask::start_from_poll_resp(work, task_token))
284 }
285 Err(e) => Err(e),
286 }
287 }
288
289 #[instrument(skip(self))]
290 async fn complete_workflow_task(
291 &self,
292 completion: WfActivationCompletion,
293 ) -> Result<(), CompleteWfError> {
294 let task_token = completion.task_token;
295 let wfstatus = completion.status;
296 let run_id = self
297 .workflow_task_tokens
298 .get(&task_token)
299 .map(|x| x.value().clone())
300 .ok_or_else(|| CompleteWfError::MalformedWorkflowCompletion {
301 reason: format!(
302 "Task token {} had no workflow run associated with it",
303 fmt_task_token(&task_token)
304 ),
305 completion: None,
306 })?;
307 let res = match wfstatus {
308 Some(wf_activation_completion::Status::Successful(success)) => {
309 self.wf_activation_success(task_token.clone(), &run_id, success)
310 .await
311 }
312 Some(wf_activation_completion::Status::Failed(failure)) => {
313 self.wf_activation_failed(task_token.clone(), &run_id, failure)
314 .await
315 }
316 None => Err(CompleteWfError::MalformedWorkflowCompletion {
317 reason: "Workflow completion had empty status field".to_owned(),
318 completion: None,
319 }),
320 };
321
322 if !self.pending_activations.has_pending(&run_id) {
325 self.outstanding_workflow_tasks.remove(&task_token);
326
327 if self.init_options.evict_after_pending_cleared {
329 self.evict_run(&task_token);
330 }
331 }
332 self.workflow_task_complete_notify.notify_one();
333 res
334 }
335
336 #[instrument(skip(self))]
337 async fn complete_activity_task(
338 &self,
339 completion: ActivityTaskCompletion,
340 ) -> Result<(), CompleteActivityError> {
341 let task_token = completion.task_token;
342 let status = if let Some(s) = completion.result.and_then(|r| r.status) {
343 s
344 } else {
345 return Err(CompleteActivityError::MalformedActivityCompletion {
346 reason: "Activity completion had empty result/status field".to_owned(),
347 completion: None,
348 });
349 };
350 let tt = task_token.clone();
351 match status {
352 activity_result::Status::Completed(ar::Success { result }) => {
353 self.server_gateway
354 .complete_activity_task(task_token, result.map(Into::into))
355 .await?;
356 }
357 activity_result::Status::Failed(ar::Failure { failure }) => {
358 self.server_gateway
359 .fail_activity_task(task_token, failure.map(Into::into))
360 .await?;
361 }
362 activity_result::Status::Canceled(ar::Cancelation { details }) => {
363 self.server_gateway
364 .cancel_activity_task(task_token, details.map(Into::into))
365 .await?;
366 }
367 }
368 self.outstanding_activity_tasks.remove(&tt);
369 self.activity_task_complete_notify.notify_waiters();
370 Ok(())
371 }
372
373 async fn record_activity_heartbeat(
374 &self,
375 details: ActivityHeartbeat,
376 ) -> Result<(), ActivityHeartbeatError> {
377 self.activity_heartbeat_manager_handle.record(details)
378 }
379
380 fn server_gateway(&self) -> Arc<dyn ServerGatewayApis> {
381 self.server_gateway.clone()
382 }
383
384 async fn shutdown(&self) {
385 self.shutdown_requested.store(true, Ordering::SeqCst);
386 self.shutdown_notify.notify_one();
387 self.workflow_machines.shutdown();
388 self.activity_heartbeat_manager_handle.shutdown().await;
389 }
390}
391
392impl<WP: ServerGatewayApis + Send + Sync + 'static> CoreSDK<WP> {
393 pub(crate) fn new(wp: WP, init_options: CoreInitOptions) -> Self {
394 let sg = Arc::new(wp);
395 Self {
396 init_options,
397 server_gateway: sg.clone(),
398 workflow_machines: WorkflowConcurrencyManager::new(),
399 workflow_task_tokens: Default::default(),
400 workflows_last_task_failed: Default::default(),
401 outstanding_workflow_tasks: Default::default(),
402 wf_task_poll_buffer: PollWorkflowTaskBuffer::new(sg.clone()),
403 pending_activations: Default::default(),
404 outstanding_activity_tasks: Default::default(),
405 shutdown_requested: AtomicBool::new(false),
406 shutdown_notify: Notify::new(),
407 workflow_task_complete_notify: Notify::new(),
408 activity_task_complete_notify: Notify::new(),
409 activity_heartbeat_manager_handle: ActivityHeartbeatManager::new(sg),
410 }
411 }
412
413 pub(crate) fn evict_run(&self, task_token: &[u8]) {
417 if let Some((_, run_id)) = self.workflow_task_tokens.remove(task_token) {
418 self.outstanding_workflow_tasks.remove(task_token);
419 self.workflow_machines.evict(&run_id);
420 self.pending_activations.remove_all_with_run_id(&run_id);
421 self.pending_activations
423 .push(create_evict_activation(task_token.to_owned(), run_id))
424 }
425 }
426
427 fn finalize_next_activation(
430 &self,
431 next_a: NextWfActivation,
432 task_token: Vec<u8>,
433 ) -> WfActivation {
434 next_a.finalize(task_token)
435 }
436
437 fn prepare_new_activation(
439 &self,
440 work: PollWorkflowTaskQueueResponse,
441 ) -> Result<Option<WfActivation>, PollWfError> {
442 if work == PollWorkflowTaskQueueResponse::default() {
443 return Ok(None);
445 }
446 let task_token = work.task_token.clone();
447 debug!(
448 task_token = %fmt_task_token(&task_token),
449 "Received workflow task from server"
450 );
451
452 let next_activation = self.instantiate_or_update_workflow(work)?;
453
454 if let Some(na) = next_activation {
455 return Ok(Some(self.finalize_next_activation(na, task_token)));
456 }
457 Ok(None)
458 }
459
460 async fn wf_activation_success(
462 &self,
463 task_token: Vec<u8>,
464 run_id: &str,
465 success: workflow_completion::Success,
466 ) -> Result<(), CompleteWfError> {
467 let cmds = success
469 .commands
470 .into_iter()
471 .map(|c| c.try_into())
472 .collect::<Result<Vec<_>, EmptyWorkflowCommandErr>>()
473 .map_err(|_| CompleteWfError::MalformedWorkflowCompletion {
474 reason: "At least one workflow command in the completion \
475 contained an empty variant"
476 .to_owned(),
477 completion: None,
478 })?;
479 let push_result = self.push_lang_commands(run_id, cmds)?;
480 self.enqueue_next_activation_if_needed(run_id, task_token.clone())?;
481 if !self.pending_activations.has_pending(run_id) {
485 self.workflows_last_task_failed.remove(run_id);
488 self.server_gateway
489 .complete_workflow_task(task_token, push_result.server_commands)
490 .await
491 .map_err(|ts| {
492 if ts.code() == tonic::Code::InvalidArgument
493 && ts.message() == "UnhandledCommand"
494 {
495 CompleteWfError::UnhandledCommandWhenCompleting
496 } else {
497 ts.into()
498 }
499 })?;
500 }
501 Ok(())
502 }
503
504 async fn wf_activation_failed(
506 &self,
507 task_token: Vec<u8>,
508 run_id: &str,
509 failure: workflow_completion::Failure,
510 ) -> Result<(), CompleteWfError> {
511 self.evict_run(&task_token);
513
514 if !self.workflows_last_task_failed.contains(run_id) {
515 self.server_gateway
516 .fail_workflow_task(
517 task_token,
518 WorkflowTaskFailedCause::Unspecified,
519 failure.failure.map(Into::into),
520 )
521 .await?;
522 self.workflows_last_task_failed.insert(run_id.to_owned());
523 }
524
525 Ok(())
526 }
527
528 fn instantiate_or_update_workflow(
536 &self,
537 poll_wf_resp: PollWorkflowTaskQueueResponse,
538 ) -> Result<Option<NextWfActivation>, PollWfError> {
539 match poll_wf_resp {
540 PollWorkflowTaskQueueResponse {
541 task_token,
542 workflow_execution: Some(workflow_execution),
543 history: Some(history),
544 ..
545 } => {
546 let run_id = workflow_execution.run_id.clone();
547 self.workflow_task_tokens.insert(task_token, run_id.clone());
549
550 match self
551 .workflow_machines
552 .create_or_update(&run_id, history, workflow_execution)
553 {
554 Ok(activation) => Ok(activation),
555 Err(source) => Err(PollWfError::WorkflowUpdateError { source, run_id }),
556 }
557 }
558 p => Err(PollWfError::BadPollResponseFromServer(p)),
559 }
560 }
561
562 fn push_lang_commands(
565 &self,
566 run_id: &str,
567 cmds: Vec<WFCommand>,
568 ) -> Result<PushCommandsResult, WorkflowUpdateError> {
569 self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds))
570 }
571
572 fn access_wf_machine<F, Fout>(
575 &self,
576 run_id: &str,
577 mutator: F,
578 ) -> Result<Fout, WorkflowUpdateError>
579 where
580 F: FnOnce(&mut WorkflowManager) -> Result<Fout, WorkflowError> + Send + 'static,
581 Fout: Send + Debug + 'static,
582 {
583 let curspan = Span::current();
584 let mutator = move |wfm: &mut WorkflowManager| {
585 let _e = curspan.enter();
586 mutator(wfm)
587 };
588 self.workflow_machines
589 .access(run_id, mutator)
590 .map_err(|source| WorkflowUpdateError {
591 source,
592 run_id: run_id.to_owned(),
593 })
594 }
595
596 async fn shutdownable_fut<FOut, FErr>(
599 &self,
600 wrap_this: impl Future<Output = Result<FOut, FErr>>,
601 ) -> Result<FOut, FErr>
602 where
603 FErr: From<ShutdownErr>,
604 {
605 let shutdownfut = async {
606 loop {
607 self.shutdown_notify.notified().await;
608 if self.shutdown_requested.load(Ordering::SeqCst) {
609 break;
610 }
611 }
612 };
613 tokio::select! {
614 _ = shutdownfut => {
615 Err(ShutdownErr.into())
616 }
617 r = wrap_this => r
618 }
619 }
620
621 fn enqueue_next_activation_if_needed(
623 &self,
624 run_id: &str,
625 task_token: Vec<u8>,
626 ) -> Result<(), CompleteWfError> {
627 if let Some(next_activation) =
628 self.access_wf_machine(run_id, move |mgr| mgr.get_next_activation())?
629 {
630 self.pending_activations
631 .push(self.finalize_next_activation(next_activation, task_token));
632 }
633 self.workflow_task_complete_notify.notify_one();
634 Ok(())
635 }
636}
637
638#[cfg(test)]
639mod test {
640 use super::*;
641 use crate::machines::test_help::fake_sg_opts;
642 use crate::protos::temporal::api::workflowservice::v1::{
643 PollActivityTaskQueueResponse, RespondActivityTaskCompletedResponse,
644 };
645 use crate::{
646 machines::test_help::{
647 build_fake_core, gen_assert_and_fail, gen_assert_and_reply, poll_and_reply,
648 EvictionMode, FakeCore, TestHistoryBuilder,
649 },
650 machines::test_help::{build_mock_sg, fake_core_from_mock_sg, hist_to_poll_resp},
651 pollers::MockServerGatewayApis,
652 protos::{
653 coresdk::{
654 activity_result::ActivityResult,
655 common::UserCodeFailure,
656 workflow_activation::{
657 wf_activation_job, FireTimer, ResolveActivity, StartWorkflow, UpdateRandomSeed,
658 WfActivationJob,
659 },
660 workflow_commands::{
661 ActivityCancellationType, CancelTimer, CompleteWorkflowExecution,
662 FailWorkflowExecution, RequestCancelActivity, ScheduleActivity, StartTimer,
663 },
664 },
665 temporal::api::{
666 enums::v1::EventType,
667 workflowservice::v1::{
668 RespondWorkflowTaskCompletedResponse, RespondWorkflowTaskFailedResponse,
669 },
670 },
671 },
672 test_help::canned_histories,
673 };
674 use rstest::{fixture, rstest};
675 use std::{collections::VecDeque, sync::atomic::AtomicU64, sync::atomic::AtomicUsize};
676
677 #[fixture(hist_batches = &[])]
678 fn single_timer_setup(hist_batches: &[usize]) -> FakeCore {
679 let wfid = "fake_wf_id";
680
681 let mut t = canned_histories::single_timer("fake_timer");
682 build_fake_core(wfid, &mut t, hist_batches)
683 }
684
685 #[fixture(hist_batches = &[])]
686 fn single_activity_setup(hist_batches: &[usize]) -> FakeCore {
687 let wfid = "fake_wf_id";
688
689 let mut t = canned_histories::single_activity("fake_activity");
690 build_fake_core(wfid, &mut t, hist_batches)
691 }
692
693 #[fixture(hist_batches = &[])]
694 fn single_activity_failure_setup(hist_batches: &[usize]) -> FakeCore {
695 let wfid = "fake_wf_id";
696
697 let mut t = canned_histories::single_failed_activity("fake_activity");
698 build_fake_core(wfid, &mut t, hist_batches)
699 }
700
701 #[rstest]
702 #[case::incremental(single_timer_setup(&[1, 2]), EvictionMode::NotSticky)]
703 #[case::replay(single_timer_setup(&[2]), EvictionMode::NotSticky)]
704 #[case::incremental_evict(single_timer_setup(&[1, 2]), EvictionMode::AfterEveryReply)]
705 #[case::replay_evict(single_timer_setup(&[2, 2]), EvictionMode::AfterEveryReply)]
706 #[tokio::test]
707 async fn single_timer_test_across_wf_bridge(
708 #[case] core: FakeCore,
709 #[case] evict: EvictionMode,
710 ) {
711 poll_and_reply(
712 &core,
713 evict,
714 &[
715 gen_assert_and_reply(
716 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
717 vec![StartTimer {
718 timer_id: "fake_timer".to_string(),
719 ..Default::default()
720 }
721 .into()],
722 ),
723 gen_assert_and_reply(
724 &job_assert!(wf_activation_job::Variant::FireTimer(_)),
725 vec![CompleteWorkflowExecution { result: None }.into()],
726 ),
727 ],
728 )
729 .await;
730 }
731
732 #[rstest(core,
733 case::incremental(single_activity_setup(&[1, 2])),
734 case::incremental_activity_failure(single_activity_failure_setup(&[1, 2])),
735 case::replay(single_activity_setup(&[2])),
736 case::replay_activity_failure(single_activity_failure_setup(&[2]))
737 )]
738 #[tokio::test]
739 async fn single_activity_completion(core: FakeCore) {
740 poll_and_reply(
741 &core,
742 EvictionMode::NotSticky,
743 &[
744 gen_assert_and_reply(
745 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
746 vec![ScheduleActivity {
747 activity_id: "fake_activity".to_string(),
748 ..Default::default()
749 }
750 .into()],
751 ),
752 gen_assert_and_reply(
753 &job_assert!(wf_activation_job::Variant::ResolveActivity(_)),
754 vec![CompleteWorkflowExecution { result: None }.into()],
755 ),
756 ],
757 )
758 .await;
759 }
760
761 #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
762 #[tokio::test]
763 async fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) {
764 let wfid = "fake_wf_id";
765 let timer_1_id = "timer1";
766 let timer_2_id = "timer2";
767
768 let mut t = canned_histories::parallel_timer(timer_1_id, timer_2_id);
769 let core = build_fake_core(wfid, &mut t, hist_batches);
770
771 poll_and_reply(
772 &core,
773 EvictionMode::NotSticky,
774 &[
775 gen_assert_and_reply(
776 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
777 vec![
778 StartTimer {
779 timer_id: timer_1_id.to_string(),
780 ..Default::default()
781 }
782 .into(),
783 StartTimer {
784 timer_id: timer_2_id.to_string(),
785 ..Default::default()
786 }
787 .into(),
788 ],
789 ),
790 gen_assert_and_reply(
791 &|res| {
792 assert_matches!(
793 res.jobs.as_slice(),
794 [
795 WfActivationJob {
796 variant: Some(wf_activation_job::Variant::FireTimer(
797 FireTimer { timer_id: t1_id }
798 )),
799 },
800 WfActivationJob {
801 variant: Some(wf_activation_job::Variant::FireTimer(
802 FireTimer { timer_id: t2_id }
803 )),
804 }
805 ] => {
806 assert_eq!(t1_id, &timer_1_id);
807 assert_eq!(t2_id, &timer_2_id);
808 }
809 );
810 },
811 vec![CompleteWorkflowExecution { result: None }.into()],
812 ),
813 ],
814 )
815 .await;
816 }
817
818 #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
819 #[tokio::test]
820 async fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) {
821 let wfid = "fake_wf_id";
822 let timer_id = "wait_timer";
823 let cancel_timer_id = "cancel_timer";
824
825 let mut t = canned_histories::cancel_timer(timer_id, cancel_timer_id);
826 let core = build_fake_core(wfid, &mut t, hist_batches);
827
828 poll_and_reply(
829 &core,
830 EvictionMode::NotSticky,
831 &[
832 gen_assert_and_reply(
833 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
834 vec![
835 StartTimer {
836 timer_id: cancel_timer_id.to_string(),
837 ..Default::default()
838 }
839 .into(),
840 StartTimer {
841 timer_id: timer_id.to_string(),
842 ..Default::default()
843 }
844 .into(),
845 ],
846 ),
847 gen_assert_and_reply(
848 &job_assert!(wf_activation_job::Variant::FireTimer(_)),
849 vec![
850 CancelTimer {
851 timer_id: cancel_timer_id.to_string(),
852 }
853 .into(),
854 CompleteWorkflowExecution { result: None }.into(),
855 ],
856 ),
857 ],
858 )
859 .await;
860 }
861
862 #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
863 #[tokio::test]
864 async fn scheduled_activity_cancellation_try_cancel(hist_batches: &[usize]) {
865 let wfid = "fake_wf_id";
866 let activity_id = "fake_activity";
867 let signal_id = "signal";
868
869 let mut t = canned_histories::cancel_scheduled_activity(activity_id, signal_id);
870 let core = build_fake_core(wfid, &mut t, hist_batches);
871
872 poll_and_reply(
873 &core,
874 EvictionMode::NotSticky,
875 &[
876 gen_assert_and_reply(
877 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
878 vec![ScheduleActivity {
879 activity_id: activity_id.to_string(),
880 cancellation_type: ActivityCancellationType::TryCancel as i32,
881 ..Default::default()
882 }
883 .into()],
884 ),
885 gen_assert_and_reply(
886 &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
887 vec![RequestCancelActivity {
888 activity_id: activity_id.to_string(),
889 ..Default::default()
890 }
891 .into()],
892 ),
893 gen_assert_and_reply(
895 &job_assert!(wf_activation_job::Variant::ResolveActivity(_)),
896 vec![CompleteWorkflowExecution { result: None }.into()],
897 ),
898 ],
899 )
900 .await;
901 }
902
903 #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
904 #[tokio::test]
905 async fn scheduled_activity_timeout(hist_batches: &[usize]) {
906 let wfid = "fake_wf_id";
907 let activity_id = "fake_activity";
908
909 let mut t = canned_histories::scheduled_activity_timeout(activity_id);
910 let core = build_fake_core(wfid, &mut t, hist_batches);
911 poll_and_reply(
912 &core,
913 EvictionMode::NotSticky,
914 &[
915 gen_assert_and_reply(
916 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
917 vec![ScheduleActivity {
918 activity_id: activity_id.to_string(),
919 ..Default::default()
920 }
921 .into()],
922 ),
923 gen_assert_and_reply(
925 &|res| {
926 assert_matches!(
927 res.jobs.as_slice(),
928 [
929 WfActivationJob {
930 variant: Some(wf_activation_job::Variant::ResolveActivity(
931 ResolveActivity {
932 activity_id: aid,
933 result: Some(ActivityResult {
934 status: Some(activity_result::Status::Failed(ar::Failure {
935 failure: Some(failure)
936 })),
937 })
938 }
939 )),
940 }
941 ] => {
942 assert_eq!(failure.message, "Activity task timed out".to_string());
943 assert_eq!(aid, &activity_id.to_string());
944 }
945 );
946 },
947 vec![CompleteWorkflowExecution { result: None }.into()],
948 ),
949 ],
950 )
951 .await;
952 }
953
954 #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
955 #[tokio::test]
956 async fn started_activity_timeout(hist_batches: &[usize]) {
957 let wfid = "fake_wf_id";
958 let activity_id = "fake_activity";
959
960 let mut t = canned_histories::started_activity_timeout(activity_id);
961 let core = build_fake_core(wfid, &mut t, hist_batches);
962
963 poll_and_reply(
964 &core,
965 EvictionMode::NotSticky,
966 &[
967 gen_assert_and_reply(
968 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
969 vec![ScheduleActivity {
970 activity_id: activity_id.to_string(),
971 ..Default::default()
972 }
973 .into()],
974 ),
975 gen_assert_and_reply(
977 &|res| {
978 assert_matches!(
979 res.jobs.as_slice(),
980 [
981 WfActivationJob {
982 variant: Some(wf_activation_job::Variant::ResolveActivity(
983 ResolveActivity {
984 activity_id: aid,
985 result: Some(ActivityResult {
986 status: Some(activity_result::Status::Failed(ar::Failure {
987 failure: Some(failure)
988 })),
989 })
990 }
991 )),
992 }
993 ] => {
994 assert_eq!(failure.message, "Activity task timed out".to_string());
995 assert_eq!(aid, &activity_id.to_string());
996 }
997 );
998 },
999 vec![CompleteWorkflowExecution { result: None }.into()],
1000 ),
1001 ],
1002 )
1003 .await;
1004 }
1005
1006 #[rstest(hist_batches, case::incremental(&[1, 3]), case::replay(&[3]))]
1007 #[tokio::test]
1008 async fn cancelled_activity_timeout(hist_batches: &[usize]) {
1009 let wfid = "fake_wf_id";
1010 let activity_id = "fake_activity";
1011 let signal_id = "signal";
1012
1013 let mut t = canned_histories::scheduled_cancelled_activity_timeout(activity_id, signal_id);
1014 let core = build_fake_core(wfid, &mut t, hist_batches);
1015
1016 poll_and_reply(
1017 &core,
1018 EvictionMode::NotSticky,
1019 &[
1020 gen_assert_and_reply(
1021 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1022 vec![ScheduleActivity {
1023 activity_id: activity_id.to_string(),
1024 ..Default::default()
1025 }
1026 .into()],
1027 ),
1028 gen_assert_and_reply(
1029 &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1030 vec![RequestCancelActivity {
1031 activity_id: activity_id.to_string(),
1032 ..Default::default()
1033 }
1034 .into()],
1035 ),
1036 gen_assert_and_reply(
1038 &job_assert!(wf_activation_job::Variant::ResolveActivity(
1039 ResolveActivity {
1040 activity_id: _,
1041 result: Some(ActivityResult {
1042 status: Some(activity_result::Status::Canceled(..)),
1043 })
1044 }
1045 )),
1046 vec![CompleteWorkflowExecution { result: None }.into()],
1047 ),
1048 ],
1049 )
1050 .await;
1051 }
1052
1053 #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
1054 #[tokio::test]
1055 async fn scheduled_activity_cancellation_abandon(hist_batches: &[usize]) {
1056 let wfid = "fake_wf_id";
1057 let activity_id = "fake_activity";
1058 let signal_id = "signal";
1059
1060 let mut t = canned_histories::cancel_scheduled_activity_abandon(activity_id, signal_id);
1061 let core = build_fake_core(wfid, &mut t, hist_batches);
1062
1063 verify_activity_cancellation_abandon(&activity_id, &core).await;
1064 }
1065
1066 #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
1067 #[tokio::test]
1068 async fn started_activity_cancellation_abandon(hist_batches: &[usize]) {
1069 let wfid = "fake_wf_id";
1070 let activity_id = "fake_activity";
1071 let signal_id = "signal";
1072
1073 let mut t = canned_histories::cancel_started_activity_abandon(activity_id, signal_id);
1074 let core = build_fake_core(wfid, &mut t, hist_batches);
1075
1076 verify_activity_cancellation_abandon(&activity_id, &core).await;
1077 }
1078
1079 async fn verify_activity_cancellation_abandon(activity_id: &&str, core: &FakeCore) {
1080 poll_and_reply(
1081 &core,
1082 EvictionMode::NotSticky,
1083 &[
1084 gen_assert_and_reply(
1085 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1086 vec![ScheduleActivity {
1087 activity_id: activity_id.to_string(),
1088 cancellation_type: ActivityCancellationType::Abandon as i32,
1089 ..Default::default()
1090 }
1091 .into()],
1092 ),
1093 gen_assert_and_reply(
1094 &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1095 vec![RequestCancelActivity {
1096 activity_id: activity_id.to_string(),
1097 ..Default::default()
1098 }
1099 .into()],
1100 ),
1101 gen_assert_and_reply(
1103 &job_assert!(wf_activation_job::Variant::ResolveActivity(
1104 ResolveActivity {
1105 activity_id: _,
1106 result: Some(ActivityResult {
1107 status: Some(activity_result::Status::Canceled(..)),
1108 })
1109 }
1110 )),
1111 vec![CompleteWorkflowExecution { result: None }.into()],
1112 ),
1113 ],
1114 )
1115 .await;
1116 }
1117
1118 #[rstest(hist_batches, case::incremental(&[1, 2, 3, 4]), case::replay(&[4]))]
1119 #[tokio::test]
1120 async fn scheduled_activity_cancellation_wait_for_cancellation(hist_batches: &[usize]) {
1121 let wfid = "fake_wf_id";
1122 let activity_id = "fake_activity";
1123 let signal_id = "signal";
1124
1125 let mut t =
1126 canned_histories::cancel_scheduled_activity_with_signal_and_activity_task_cancel(
1127 activity_id,
1128 signal_id,
1129 );
1130 let core = build_fake_core(wfid, &mut t, hist_batches);
1131
1132 verify_activity_cancellation_wait_for_cancellation(activity_id, &core).await;
1133 }
1134
1135 #[rstest(hist_batches, case::incremental(&[1, 2, 3, 4]), case::replay(&[4]))]
1136 #[tokio::test]
1137 async fn started_activity_cancellation_wait_for_cancellation(hist_batches: &[usize]) {
1138 let wfid = "fake_wf_id";
1139 let activity_id = "fake_activity";
1140 let signal_id = "signal";
1141
1142 let mut t = canned_histories::cancel_started_activity_with_signal_and_activity_task_cancel(
1143 activity_id,
1144 signal_id,
1145 );
1146 let core = build_fake_core(wfid, &mut t, hist_batches);
1147
1148 verify_activity_cancellation_wait_for_cancellation(activity_id, &core).await;
1149 }
1150
1151 async fn verify_activity_cancellation_wait_for_cancellation(
1152 activity_id: &str,
1153 core: &FakeCore,
1154 ) {
1155 poll_and_reply(
1156 &core,
1157 EvictionMode::NotSticky,
1158 &[
1159 gen_assert_and_reply(
1160 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1161 vec![ScheduleActivity {
1162 activity_id: activity_id.to_string(),
1163 cancellation_type: ActivityCancellationType::WaitCancellationCompleted
1164 as i32,
1165 ..Default::default()
1166 }
1167 .into()],
1168 ),
1169 gen_assert_and_reply(
1170 &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1171 vec![RequestCancelActivity {
1172 activity_id: activity_id.to_string(),
1173 ..Default::default()
1174 }
1175 .into()],
1176 ),
1177 gen_assert_and_reply(
1179 &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1180 vec![],
1181 ),
1182 gen_assert_and_reply(
1184 &job_assert!(wf_activation_job::Variant::ResolveActivity(
1185 ResolveActivity {
1186 activity_id: _,
1187 result: Some(ActivityResult {
1188 status: Some(activity_result::Status::Canceled(..)),
1189 })
1190 }
1191 )),
1192 vec![CompleteWorkflowExecution { result: None }.into()],
1193 ),
1194 ],
1195 )
1196 .await;
1197 }
1198
1199 #[rstest(hist_batches, case::incremental(&[1, 3]), case::replay(&[3]))]
1200 #[tokio::test]
1201 async fn scheduled_activity_cancellation_try_cancel_task_canceled(hist_batches: &[usize]) {
1202 let wfid = "fake_wf_id";
1203 let activity_id = "fake_activity";
1204 let signal_id = "signal";
1205
1206 let mut t = canned_histories::cancel_scheduled_activity_with_activity_task_cancel(
1207 activity_id,
1208 signal_id,
1209 );
1210 let core = build_fake_core(wfid, &mut t, hist_batches);
1211
1212 verify_activity_cancellation_try_cancel_task_canceled(&activity_id, &core).await;
1213 }
1214
1215 #[rstest(hist_batches, case::incremental(&[1, 3]), case::replay(&[3]))]
1216 #[tokio::test]
1217 async fn started_activity_cancellation_try_cancel_task_canceled(hist_batches: &[usize]) {
1218 let wfid = "fake_wf_id";
1219 let activity_id = "fake_activity";
1220 let signal_id = "signal";
1221
1222 let mut t = canned_histories::cancel_started_activity_with_activity_task_cancel(
1223 activity_id,
1224 signal_id,
1225 );
1226 let core = build_fake_core(wfid, &mut t, hist_batches);
1227
1228 verify_activity_cancellation_try_cancel_task_canceled(&activity_id, &core).await;
1229 }
1230
1231 async fn verify_activity_cancellation_try_cancel_task_canceled(
1232 activity_id: &&str,
1233 core: &FakeCore,
1234 ) {
1235 poll_and_reply(
1236 &core,
1237 EvictionMode::NotSticky,
1238 &[
1239 gen_assert_and_reply(
1240 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1241 vec![ScheduleActivity {
1242 activity_id: activity_id.to_string(),
1243 cancellation_type: ActivityCancellationType::TryCancel as i32,
1244 ..Default::default()
1245 }
1246 .into()],
1247 ),
1248 gen_assert_and_reply(
1249 &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1250 vec![RequestCancelActivity {
1251 activity_id: activity_id.to_string(),
1252 ..Default::default()
1253 }
1254 .into()],
1255 ),
1256 gen_assert_and_reply(
1258 &job_assert!(wf_activation_job::Variant::ResolveActivity(
1259 ResolveActivity {
1260 activity_id: _,
1261 result: Some(ActivityResult {
1262 status: Some(activity_result::Status::Canceled(..)),
1263 })
1264 }
1265 )),
1266 vec![CompleteWorkflowExecution { result: None }.into()],
1267 ),
1268 ],
1269 )
1270 .await;
1271 }
1272
1273 #[rstest(single_timer_setup(&[1]))]
1274 #[tokio::test]
1275 async fn after_shutdown_server_is_not_polled(single_timer_setup: FakeCore) {
1276 let res = single_timer_setup.inner.poll_workflow_task().await.unwrap();
1277 assert_eq!(res.jobs.len(), 1);
1278
1279 single_timer_setup.inner.shutdown().await;
1280 assert_matches!(
1281 single_timer_setup
1282 .inner
1283 .poll_workflow_task()
1284 .await
1285 .unwrap_err(),
1286 PollWfError::ShutDown
1287 );
1288 }
1289
1290 #[tokio::test]
1291 async fn workflow_update_random_seed_on_workflow_reset() {
1292 let wfid = "fake_wf_id";
1293 let new_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156";
1294 let timer_1_id = "timer1";
1295 let randomness_seed_from_start = AtomicU64::new(0);
1296
1297 let mut t = canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, new_run_id);
1298 let core = build_fake_core(wfid, &mut t, &[2]);
1299
1300 poll_and_reply(
1301 &core,
1302 EvictionMode::NotSticky,
1303 &[
1304 gen_assert_and_reply(
1305 &|res| {
1306 assert_matches!(
1307 res.jobs.as_slice(),
1308 [WfActivationJob {
1309 variant: Some(wf_activation_job::Variant::StartWorkflow(
1310 StartWorkflow{randomness_seed, ..}
1311 )),
1312 }] => {
1313 randomness_seed_from_start.store(*randomness_seed, Ordering::SeqCst);
1314 }
1315 );
1316 },
1317 vec![StartTimer {
1318 timer_id: timer_1_id.to_string(),
1319 ..Default::default()
1320 }
1321 .into()],
1322 ),
1323 gen_assert_and_reply(
1324 &|res| {
1325 assert_matches!(
1326 res.jobs.as_slice(),
1327 [WfActivationJob {
1328 variant: Some(wf_activation_job::Variant::FireTimer(_),),
1329 },
1330 WfActivationJob {
1331 variant: Some(wf_activation_job::Variant::UpdateRandomSeed(
1332 UpdateRandomSeed{randomness_seed})),
1333 }] => {
1334 assert_ne!(randomness_seed_from_start.load(Ordering::SeqCst),
1335 *randomness_seed)
1336 }
1337 )
1338 },
1339 vec![CompleteWorkflowExecution { result: None }.into()],
1340 ),
1341 ],
1342 )
1343 .await;
1344 }
1345
1346 #[tokio::test]
1347 async fn cancel_timer_before_sent_wf_bridge() {
1348 let wfid = "fake_wf_id";
1349 let cancel_timer_id = "cancel_timer";
1350
1351 let mut t = TestHistoryBuilder::default();
1352 t.add_by_type(EventType::WorkflowExecutionStarted);
1353 t.add_full_wf_task();
1354 t.add_workflow_execution_completed();
1355
1356 let core = build_fake_core(wfid, &mut t, &[1]);
1357
1358 poll_and_reply(
1359 &core,
1360 EvictionMode::NotSticky,
1361 &[gen_assert_and_reply(
1362 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1363 vec![
1364 StartTimer {
1365 timer_id: cancel_timer_id.to_string(),
1366 ..Default::default()
1367 }
1368 .into(),
1369 CancelTimer {
1370 timer_id: cancel_timer_id.to_string(),
1371 }
1372 .into(),
1373 CompleteWorkflowExecution { result: None }.into(),
1374 ],
1375 )],
1376 )
1377 .await;
1378 }
1379
1380 #[rstest]
1381 #[case::no_evict_inc(&[1, 2, 2], EvictionMode::NotSticky)]
1382 #[case::no_evict(&[2, 2], EvictionMode::NotSticky)]
1383 #[case::evict(&[1, 2, 2, 2], EvictionMode::AfterEveryReply)]
1384 #[tokio::test]
1385 async fn complete_activation_with_failure(
1386 #[case] batches: &[usize],
1387 #[case] evict: EvictionMode,
1388 ) {
1389 let wfid = "fake_wf_id";
1390 let timer_id = "timer";
1391
1392 let mut t = canned_histories::workflow_fails_with_failure_after_timer(timer_id);
1393 let mut mock_sg = build_mock_sg(wfid, &mut t, batches);
1394 mock_sg
1396 .expect_fail_workflow_task()
1397 .times(1)
1398 .returning(|_, _, _| Ok(RespondWorkflowTaskFailedResponse {}));
1399 let core = fake_core_from_mock_sg(mock_sg, batches);
1400
1401 poll_and_reply(
1402 &core,
1403 evict,
1404 &[
1405 gen_assert_and_reply(
1406 &|_| {},
1407 vec![StartTimer {
1408 timer_id: timer_id.to_owned(),
1409 ..Default::default()
1410 }
1411 .into()],
1412 ),
1413 gen_assert_and_fail(&|_| {}),
1414 gen_assert_and_reply(
1415 &job_assert!(wf_activation_job::Variant::FireTimer(_)),
1416 vec![CompleteWorkflowExecution { result: None }.into()],
1417 ),
1418 ],
1419 )
1420 .await;
1421 }
1422
1423 #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
1424 #[tokio::test]
1425 async fn simple_timer_fail_wf_execution(hist_batches: &[usize]) {
1426 let wfid = "fake_wf_id";
1427 let timer_id = "timer1";
1428
1429 let mut t = canned_histories::single_timer(timer_id);
1430 let core = build_fake_core(wfid, &mut t, hist_batches);
1431
1432 poll_and_reply(
1433 &core,
1434 EvictionMode::NotSticky,
1435 &[
1436 gen_assert_and_reply(
1437 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1438 vec![StartTimer {
1439 timer_id: timer_id.to_string(),
1440 ..Default::default()
1441 }
1442 .into()],
1443 ),
1444 gen_assert_and_reply(
1445 &job_assert!(wf_activation_job::Variant::FireTimer(_)),
1446 vec![FailWorkflowExecution {
1447 failure: Some(UserCodeFailure {
1448 message: "I'm ded".to_string(),
1449 ..Default::default()
1450 }),
1451 }
1452 .into()],
1453 ),
1454 ],
1455 )
1456 .await;
1457 }
1458
1459 #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
1460 #[tokio::test]
1461 async fn two_signals(hist_batches: &[usize]) {
1462 let wfid = "fake_wf_id";
1463
1464 let mut t = canned_histories::two_signals("sig1", "sig2");
1465 let core = build_fake_core(wfid, &mut t, hist_batches);
1466
1467 poll_and_reply(
1468 &core,
1469 EvictionMode::NotSticky,
1470 &[
1471 gen_assert_and_reply(
1472 &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1473 vec![],
1475 ),
1476 gen_assert_and_reply(
1477 &job_assert!(
1478 wf_activation_job::Variant::SignalWorkflow(_),
1479 wf_activation_job::Variant::SignalWorkflow(_)
1480 ),
1481 vec![],
1482 ),
1483 ],
1484 )
1485 .await;
1486 }
1487
1488 #[tokio::test]
1489 async fn workflow_failures_only_reported_once() {
1490 let wfid = "fake_wf_id";
1491 let timer_1 = "timer1";
1492 let timer_2 = "timer2";
1493
1494 let mut t =
1495 canned_histories::workflow_fails_with_failure_two_different_points(timer_1, timer_2);
1496 let batches = &[
1497 1, 2, 2, 2, 2, 3, 3, 3,
1501 ];
1502 let mut mock_sg = build_mock_sg(wfid, &mut t, batches);
1503 mock_sg
1504 .expect_fail_workflow_task()
1505 .times(2)
1507 .returning(|_, _, _| Ok(RespondWorkflowTaskFailedResponse {}));
1508 let core = fake_core_from_mock_sg(mock_sg, batches);
1509
1510 poll_and_reply(
1511 &core,
1512 EvictionMode::NotSticky,
1513 &[
1514 gen_assert_and_reply(
1515 &|_| {},
1516 vec![StartTimer {
1517 timer_id: timer_1.to_owned(),
1518 ..Default::default()
1519 }
1520 .into()],
1521 ),
1522 gen_assert_and_fail(&|_| {}),
1524 gen_assert_and_fail(&|_| {}),
1525 gen_assert_and_fail(&|_| {}),
1526 gen_assert_and_reply(
1527 &job_assert!(wf_activation_job::Variant::FireTimer(_)),
1528 vec![StartTimer {
1529 timer_id: timer_2.to_string(),
1530 ..Default::default()
1531 }
1532 .into()],
1533 ),
1534 gen_assert_and_fail(&|_| {}),
1536 gen_assert_and_fail(&|_| {}),
1537 gen_assert_and_reply(
1538 &job_assert!(wf_activation_job::Variant::FireTimer(_)),
1539 vec![CompleteWorkflowExecution { result: None }.into()],
1540 ),
1541 ],
1542 )
1543 .await;
1544 }
1545
1546 #[tokio::test]
1547 async fn max_concurrent_wft_respected() {
1548 let mut t1 = canned_histories::long_sequential_timers(20);
1550 let mut t2 = canned_histories::long_sequential_timers(20);
1551 let mut tasks = VecDeque::from(vec![
1552 hist_to_poll_resp(&mut t1, "wf1", 100),
1553 hist_to_poll_resp(&mut t2, "wf2", 100),
1554 ]);
1555 let mut mock_gateway = MockServerGatewayApis::new();
1558 mock_gateway
1559 .expect_poll_workflow_task()
1560 .times(2)
1561 .returning(move || Ok(tasks.pop_front().unwrap()));
1562 mock_gateway
1564 .expect_complete_workflow_task()
1565 .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default()));
1566
1567 let core = CoreSDK::new(
1568 mock_gateway,
1569 CoreInitOptions {
1570 gateway_opts: fake_sg_opts(),
1571 evict_after_pending_cleared: true,
1572 max_outstanding_workflow_tasks: 2,
1573 max_outstanding_activities: 1,
1574 },
1575 );
1576
1577 let r1 = core.poll_workflow_task().await.unwrap();
1579 let _r2 = core.poll_workflow_task().await.unwrap();
1580 let last_finisher = AtomicUsize::new(0);
1583 let (_, mut r1) = tokio::join! {
1584 async {
1585 core.complete_workflow_task(WfActivationCompletion::from_status(
1586 r1.task_token,
1587 workflow_completion::Success::from_cmds(vec![StartTimer {
1588 timer_id: "timer-1".to_string(),
1589 ..Default::default()
1590 }
1591 .into()]).into()
1592 )).await.unwrap();
1593 last_finisher.store(1, Ordering::SeqCst);
1594 },
1595 async {
1596 let r = core.poll_workflow_task().await.unwrap();
1597 last_finisher.store(2, Ordering::SeqCst);
1598 r
1599 }
1600 };
1601 assert_eq!(last_finisher.load(Ordering::Acquire), 2);
1603
1604 for i in 2..19 {
1606 core.complete_workflow_task(WfActivationCompletion::from_status(
1607 r1.task_token,
1608 workflow_completion::Success::from_cmds(vec![StartTimer {
1609 timer_id: format!("timer-{}", i),
1610 ..Default::default()
1611 }
1612 .into()])
1613 .into(),
1614 ))
1615 .await
1616 .unwrap();
1617 r1 = core.poll_workflow_task().await.unwrap();
1618 }
1619 }
1620
1621 #[tokio::test]
1622 async fn max_activites_respected() {
1623 let mut tasks = VecDeque::from(vec![
1624 PollActivityTaskQueueResponse {
1625 task_token: vec![1],
1626 activity_id: "act1".to_string(),
1627 ..Default::default()
1628 },
1629 PollActivityTaskQueueResponse {
1630 task_token: vec![2],
1631 activity_id: "act2".to_string(),
1632 ..Default::default()
1633 },
1634 PollActivityTaskQueueResponse {
1635 task_token: vec![3],
1636 activity_id: "act3".to_string(),
1637 ..Default::default()
1638 },
1639 ]);
1640 let mut mock_gateway = MockServerGatewayApis::new();
1641 mock_gateway
1642 .expect_poll_activity_task()
1643 .times(3)
1644 .returning(move || Ok(tasks.pop_front().unwrap()));
1645 mock_gateway
1646 .expect_complete_activity_task()
1647 .returning(|_, _| Ok(RespondActivityTaskCompletedResponse::default()));
1648
1649 let core = CoreSDK::new(
1650 mock_gateway,
1651 CoreInitOptions {
1652 gateway_opts: fake_sg_opts(),
1653 evict_after_pending_cleared: true,
1654 max_outstanding_workflow_tasks: 1,
1655 max_outstanding_activities: 2,
1656 },
1657 );
1658
1659 let r1 = core.poll_activity_task().await.unwrap();
1661 let _r2 = core.poll_activity_task().await.unwrap();
1662 let last_finisher = AtomicUsize::new(0);
1664 tokio::join! {
1665 async {
1666 core.complete_activity_task(ActivityTaskCompletion {
1667 task_token: r1.task_token,
1668 result: Some(ActivityResult::ok(vec![1].into()))
1669 }).await.unwrap();
1670 last_finisher.store(1, Ordering::SeqCst);
1671 },
1672 async {
1673 core.poll_activity_task().await.unwrap();
1674 last_finisher.store(2, Ordering::SeqCst);
1675 }
1676 };
1677 assert_eq!(last_finisher.load(Ordering::Acquire), 2);
1679 }
1680}