temporal_sdk_core/
lib.rs

1#![warn(missing_docs)] // error if there are missing docs
2#![allow(clippy::upper_case_acronyms)]
3
4//! This crate provides a basis for creating new Temporal SDKs without completely starting from
5//! scratch
6
7#[cfg(test)]
8#[macro_use]
9pub extern crate assert_matches;
10#[macro_use]
11extern crate tracing;
12
13pub mod protos;
14
15mod activity;
16pub(crate) mod core_tracing;
17mod errors;
18mod machines;
19mod pending_activations;
20mod pollers;
21mod protosext;
22mod workflow;
23
24#[cfg(test)]
25mod test_help;
26
27pub use crate::errors::{
28    CompleteActivityError, CompleteWfError, CoreInitError, PollActivityError, PollWfError,
29};
30pub use core_tracing::tracing_init;
31pub use pollers::{PollTaskRequest, ServerGateway, ServerGatewayApis, ServerGatewayOptions};
32pub use url::Url;
33
34use crate::{
35    activity::{ActivityHeartbeatManager, ActivityHeartbeatManagerHandle},
36    errors::{ActivityHeartbeatError, ShutdownErr, WorkflowUpdateError},
37    machines::{EmptyWorkflowCommandErr, WFCommand},
38    pending_activations::PendingActivations,
39    pollers::PollWorkflowTaskBuffer,
40    protos::{
41        coresdk::{
42            activity_result::{self as ar, activity_result},
43            activity_task::ActivityTask,
44            workflow_activation::{create_evict_activation, WfActivation},
45            workflow_completion::{self, wf_activation_completion, WfActivationCompletion},
46            ActivityHeartbeat, ActivityTaskCompletion,
47        },
48        temporal::api::{
49            enums::v1::WorkflowTaskFailedCause, workflowservice::v1::PollWorkflowTaskQueueResponse,
50        },
51    },
52    protosext::fmt_task_token,
53    workflow::{
54        NextWfActivation, PushCommandsResult, WorkflowConcurrencyManager, WorkflowError,
55        WorkflowManager,
56    },
57};
58use dashmap::{DashMap, DashSet};
59use futures::TryFutureExt;
60use std::{
61    convert::TryInto,
62    fmt::Debug,
63    future::Future,
64    sync::{
65        atomic::{AtomicBool, Ordering},
66        Arc,
67    },
68};
69use tokio::sync::Notify;
70use tracing::Span;
71
72/// This trait is the primary way by which language specific SDKs interact with the core SDK. It is
73/// expected that only one instance of an implementation will exist for the lifetime of the
74/// worker(s) using it.
75#[async_trait::async_trait]
76pub trait Core: Send + Sync {
77    /// Ask the core for some work, returning a [WfActivation]. It is then the language SDK's
78    /// responsibility to call the appropriate workflow code with the provided inputs. Blocks
79    /// indefinitely until such work is available or [Core::shutdown] is called.
80    ///
81    /// TODO: Examples
82    async fn poll_workflow_task(&self) -> Result<WfActivation, PollWfError>;
83
84    /// Ask the core for some work, returning an [ActivityTask]. It is then the language SDK's
85    /// responsibility to call the appropriate activity code with the provided inputs. Blocks
86    /// indefinitely until such work is available or [Core::shutdown] is called.
87    ///
88    /// TODO: Examples
89    async fn poll_activity_task(&self) -> Result<ActivityTask, PollActivityError>;
90
91    /// Tell the core that a workflow activation has completed
92    async fn complete_workflow_task(
93        &self,
94        completion: WfActivationCompletion,
95    ) -> Result<(), CompleteWfError>;
96
97    /// Tell the core that an activity has finished executing
98    async fn complete_activity_task(
99        &self,
100        completion: ActivityTaskCompletion,
101    ) -> Result<(), CompleteActivityError>;
102
103    /// Notify workflow that activity is still alive. Long running activities that take longer than
104    /// `activity_heartbeat_timeout` to finish must call this function in order to report progress,
105    /// otherwise activity will timeout and new attempt will be scheduled.
106    /// `result` contains latest known activity cancelation status.
107    /// Note that heartbeat requests are getting batched and are sent to the server periodically,
108    /// this function is going to return immediately and request will be queued in the core.
109    /// Unlike java/go SDKs we are not going to return cancellation status as part of heartbeat response
110    /// and instead will send it as a separate activity task to the lang, decoupling heartbeat and
111    /// cancellation processing.
112    /// For now activity still needs to heartbeat if it wants to receive cancellation requests.
113    /// In the future we are going to change this and will dispatch cancellations more proactively.
114    async fn record_activity_heartbeat(
115        &self,
116        details: ActivityHeartbeat,
117    ) -> Result<(), ActivityHeartbeatError>;
118
119    /// Returns core's instance of the [ServerGatewayApis] implementor it is using.
120    fn server_gateway(&self) -> Arc<dyn ServerGatewayApis>;
121
122    /// Initiates async shutdown procedure, eventually ceases all polling of the server.
123    /// [Core::poll_workflow_task] should be called until it returns [PollWfError::ShutDown]
124    /// to ensure that any workflows which are still undergoing replay have an opportunity to finish.
125    /// This means that the lang sdk will need to call [Core::complete_workflow_task] for those
126    /// workflows until they are done. At that point, the lang SDK can end the process,
127    /// or drop the [Core] instance, which will close the connection.
128    async fn shutdown(&self);
129}
130
131/// Holds various configuration information required to call [init]
132pub struct CoreInitOptions {
133    /// Options for the connection to the temporal server
134    pub gateway_opts: ServerGatewayOptions,
135    /// If set to true (which should be the default choice until sticky task queues are implemented)
136    /// workflows are evicted after they no longer have any pending activations. IE: After they
137    /// have sent new commands to the server.
138    pub evict_after_pending_cleared: bool,
139    /// The maximum allowed number of workflow tasks that will ever be given to lang at one
140    /// time. Note that one workflow task may require multiple activations - so the WFT counts as
141    /// "outstanding" until all activations it requires have been completed.
142    pub max_outstanding_workflow_tasks: usize,
143    /// The maximum allowed number of activity tasks that will ever be given to lang at one time.
144    pub max_outstanding_activities: usize,
145}
146
147/// Initializes an instance of the core sdk and establishes a connection to the temporal server.
148///
149/// Note: Also creates a tokio runtime that will be used for all client-server interactions.  
150///
151/// # Panics
152/// * Will panic if called from within an async context, as it will construct a runtime and you
153///   cannot construct a runtime from within a runtime.
154pub async fn init(opts: CoreInitOptions) -> Result<impl Core, CoreInitError> {
155    // Initialize server client
156    let work_provider = opts.gateway_opts.connect().await?;
157
158    Ok(CoreSDK::new(work_provider, opts))
159}
160
161struct CoreSDK<WP> {
162    /// Options provided at initialization time
163    init_options: CoreInitOptions,
164    /// Provides work in the form of responses the server would send from polling task Qs
165    server_gateway: Arc<WP>,
166    /// Key is run id
167    workflow_machines: WorkflowConcurrencyManager,
168    // TODO: Probably move all workflow stuff inside wf manager?
169    /// Maps task tokens to workflow run ids
170    workflow_task_tokens: DashMap<Vec<u8>, String>,
171    /// Workflows (by run id) for which the last task completion we sent was a failure
172    workflows_last_task_failed: DashSet<String>,
173    /// Distinguished from `workflow_task_tokens` by the fact that when we are caching workflows
174    /// in sticky mode, we need to know if there are any outstanding workflow tasks since they
175    /// must all be handled first before we poll the server again.
176    outstanding_workflow_tasks: DashSet<Vec<u8>>,
177
178    /// Buffers workflow task polling in the event we need to return a pending activation while
179    /// a poll is ongoing
180    wf_task_poll_buffer: PollWorkflowTaskBuffer,
181
182    /// Workflows may generate new activations immediately upon completion (ex: while replaying,
183    /// or when cancelling an activity in try-cancel/abandon mode). They queue here.
184    pending_activations: PendingActivations,
185
186    activity_heartbeat_manager_handle: ActivityHeartbeatManagerHandle,
187    /// Activities that have been issued to lang but not yet completed
188    outstanding_activity_tasks: DashSet<Vec<u8>>,
189    /// Has shutdown been called?
190    shutdown_requested: AtomicBool,
191    /// Used to wake up future which checks shutdown state
192    shutdown_notify: Notify,
193    /// Used to wake blocked workflow task polling when tasks complete
194    workflow_task_complete_notify: Notify,
195    /// Used to wake blocked activity task polling when tasks complete
196    activity_task_complete_notify: Notify,
197}
198
199#[async_trait::async_trait]
200impl<WP> Core for CoreSDK<WP>
201where
202    WP: ServerGatewayApis + Send + Sync + 'static,
203{
204    #[instrument(skip(self))]
205    async fn poll_workflow_task(&self) -> Result<WfActivation, PollWfError> {
206        // The poll needs to be in a loop because we can't guarantee tail call optimization in Rust
207        // (simply) and we really, really need that for long-poll retries.
208        loop {
209            // We must first check if there are pending workflow activations for workflows that are
210            // currently replaying or otherwise need immediate jobs, and issue those before
211            // bothering the server.
212            if let Some(pa) = self.pending_activations.pop() {
213                return Ok(pa);
214            }
215
216            if self.shutdown_requested.load(Ordering::SeqCst) {
217                return Err(PollWfError::ShutDown);
218            }
219
220            // Do not proceed to poll unless we are below the outstanding WFT limit
221            if self.outstanding_workflow_tasks.len()
222                >= self.init_options.max_outstanding_workflow_tasks
223            {
224                self.workflow_task_complete_notify.notified().await;
225                continue;
226            }
227
228            let task_complete_fut = self.workflow_task_complete_notify.notified();
229            let poll_result_future = self.shutdownable_fut(
230                self.wf_task_poll_buffer
231                    .poll_workflow_task()
232                    .map_err(Into::into),
233            );
234
235            debug!("Polling server");
236
237            let selected_f = tokio::select! {
238                // If a task is completed while we are waiting on polling, we need to restart the
239                // loop right away to provide any potential new pending activation
240                _ = task_complete_fut => {
241                    continue;
242                }
243                r = poll_result_future => r
244            };
245
246            match selected_f {
247                Ok(work) => {
248                    if !work.next_page_token.is_empty() {
249                        // TODO: Support history pagination
250                        unimplemented!("History pagination not yet implemented");
251                    }
252                    if let Some(activation) = self.prepare_new_activation(work)? {
253                        self.outstanding_workflow_tasks
254                            .insert(activation.task_token.clone());
255                        return Ok(activation);
256                    }
257                }
258                // Drain pending activations in case of shutdown.
259                Err(PollWfError::ShutDown) => continue,
260                Err(e) => return Err(e),
261            }
262        }
263    }
264
265    #[instrument(skip(self))]
266    async fn poll_activity_task(&self) -> Result<ActivityTask, PollActivityError> {
267        if self.shutdown_requested.load(Ordering::SeqCst) {
268            return Err(PollActivityError::ShutDown);
269        }
270
271        while self.outstanding_activity_tasks.len() >= self.init_options.max_outstanding_activities
272        {
273            self.activity_task_complete_notify.notified().await
274        }
275
276        match self
277            .shutdownable_fut(self.server_gateway.poll_activity_task().map_err(Into::into))
278            .await
279        {
280            Ok(work) => {
281                let task_token = work.task_token.clone();
282                self.outstanding_activity_tasks.insert(task_token.clone());
283                Ok(ActivityTask::start_from_poll_resp(work, task_token))
284            }
285            Err(e) => Err(e),
286        }
287    }
288
289    #[instrument(skip(self))]
290    async fn complete_workflow_task(
291        &self,
292        completion: WfActivationCompletion,
293    ) -> Result<(), CompleteWfError> {
294        let task_token = completion.task_token;
295        let wfstatus = completion.status;
296        let run_id = self
297            .workflow_task_tokens
298            .get(&task_token)
299            .map(|x| x.value().clone())
300            .ok_or_else(|| CompleteWfError::MalformedWorkflowCompletion {
301                reason: format!(
302                    "Task token {} had no workflow run associated with it",
303                    fmt_task_token(&task_token)
304                ),
305                completion: None,
306            })?;
307        let res = match wfstatus {
308            Some(wf_activation_completion::Status::Successful(success)) => {
309                self.wf_activation_success(task_token.clone(), &run_id, success)
310                    .await
311            }
312            Some(wf_activation_completion::Status::Failed(failure)) => {
313                self.wf_activation_failed(task_token.clone(), &run_id, failure)
314                    .await
315            }
316            None => Err(CompleteWfError::MalformedWorkflowCompletion {
317                reason: "Workflow completion had empty status field".to_owned(),
318                completion: None,
319            }),
320        };
321
322        // Workflows with no more pending activations (IE: They have completed a WFT) must be
323        // removed from the outstanding tasks map
324        if !self.pending_activations.has_pending(&run_id) {
325            self.outstanding_workflow_tasks.remove(&task_token);
326
327            // Blow them up if we're in non-sticky mode as well
328            if self.init_options.evict_after_pending_cleared {
329                self.evict_run(&task_token);
330            }
331        }
332        self.workflow_task_complete_notify.notify_one();
333        res
334    }
335
336    #[instrument(skip(self))]
337    async fn complete_activity_task(
338        &self,
339        completion: ActivityTaskCompletion,
340    ) -> Result<(), CompleteActivityError> {
341        let task_token = completion.task_token;
342        let status = if let Some(s) = completion.result.and_then(|r| r.status) {
343            s
344        } else {
345            return Err(CompleteActivityError::MalformedActivityCompletion {
346                reason: "Activity completion had empty result/status field".to_owned(),
347                completion: None,
348            });
349        };
350        let tt = task_token.clone();
351        match status {
352            activity_result::Status::Completed(ar::Success { result }) => {
353                self.server_gateway
354                    .complete_activity_task(task_token, result.map(Into::into))
355                    .await?;
356            }
357            activity_result::Status::Failed(ar::Failure { failure }) => {
358                self.server_gateway
359                    .fail_activity_task(task_token, failure.map(Into::into))
360                    .await?;
361            }
362            activity_result::Status::Canceled(ar::Cancelation { details }) => {
363                self.server_gateway
364                    .cancel_activity_task(task_token, details.map(Into::into))
365                    .await?;
366            }
367        }
368        self.outstanding_activity_tasks.remove(&tt);
369        self.activity_task_complete_notify.notify_waiters();
370        Ok(())
371    }
372
373    async fn record_activity_heartbeat(
374        &self,
375        details: ActivityHeartbeat,
376    ) -> Result<(), ActivityHeartbeatError> {
377        self.activity_heartbeat_manager_handle.record(details)
378    }
379
380    fn server_gateway(&self) -> Arc<dyn ServerGatewayApis> {
381        self.server_gateway.clone()
382    }
383
384    async fn shutdown(&self) {
385        self.shutdown_requested.store(true, Ordering::SeqCst);
386        self.shutdown_notify.notify_one();
387        self.workflow_machines.shutdown();
388        self.activity_heartbeat_manager_handle.shutdown().await;
389    }
390}
391
392impl<WP: ServerGatewayApis + Send + Sync + 'static> CoreSDK<WP> {
393    pub(crate) fn new(wp: WP, init_options: CoreInitOptions) -> Self {
394        let sg = Arc::new(wp);
395        Self {
396            init_options,
397            server_gateway: sg.clone(),
398            workflow_machines: WorkflowConcurrencyManager::new(),
399            workflow_task_tokens: Default::default(),
400            workflows_last_task_failed: Default::default(),
401            outstanding_workflow_tasks: Default::default(),
402            wf_task_poll_buffer: PollWorkflowTaskBuffer::new(sg.clone()),
403            pending_activations: Default::default(),
404            outstanding_activity_tasks: Default::default(),
405            shutdown_requested: AtomicBool::new(false),
406            shutdown_notify: Notify::new(),
407            workflow_task_complete_notify: Notify::new(),
408            activity_task_complete_notify: Notify::new(),
409            activity_heartbeat_manager_handle: ActivityHeartbeatManager::new(sg),
410        }
411    }
412
413    /// Evict a workflow from the cache by it's run id
414    ///
415    /// TODO: Very likely needs to be in Core public api
416    pub(crate) fn evict_run(&self, task_token: &[u8]) {
417        if let Some((_, run_id)) = self.workflow_task_tokens.remove(task_token) {
418            self.outstanding_workflow_tasks.remove(task_token);
419            self.workflow_machines.evict(&run_id);
420            self.pending_activations.remove_all_with_run_id(&run_id);
421            // Queue up an eviction activation
422            self.pending_activations
423                .push(create_evict_activation(task_token.to_owned(), run_id))
424        }
425    }
426
427    /// Prepare an activation we've just pulled out of a workflow machines instance to be shipped
428    /// to the lang sdk
429    fn finalize_next_activation(
430        &self,
431        next_a: NextWfActivation,
432        task_token: Vec<u8>,
433    ) -> WfActivation {
434        next_a.finalize(task_token)
435    }
436
437    /// Given a wf task from the server, prepare an activation (if there is one) to be sent to lang
438    fn prepare_new_activation(
439        &self,
440        work: PollWorkflowTaskQueueResponse,
441    ) -> Result<Option<WfActivation>, PollWfError> {
442        if work == PollWorkflowTaskQueueResponse::default() {
443            // We get the default proto in the event that the long poll times out.
444            return Ok(None);
445        }
446        let task_token = work.task_token.clone();
447        debug!(
448            task_token = %fmt_task_token(&task_token),
449            "Received workflow task from server"
450        );
451
452        let next_activation = self.instantiate_or_update_workflow(work)?;
453
454        if let Some(na) = next_activation {
455            return Ok(Some(self.finalize_next_activation(na, task_token)));
456        }
457        Ok(None)
458    }
459
460    /// Handle a successful workflow completion
461    async fn wf_activation_success(
462        &self,
463        task_token: Vec<u8>,
464        run_id: &str,
465        success: workflow_completion::Success,
466    ) -> Result<(), CompleteWfError> {
467        // Convert to wf commands
468        let cmds = success
469            .commands
470            .into_iter()
471            .map(|c| c.try_into())
472            .collect::<Result<Vec<_>, EmptyWorkflowCommandErr>>()
473            .map_err(|_| CompleteWfError::MalformedWorkflowCompletion {
474                reason: "At least one workflow command in the completion \
475                                contained an empty variant"
476                    .to_owned(),
477                completion: None,
478            })?;
479        let push_result = self.push_lang_commands(run_id, cmds)?;
480        self.enqueue_next_activation_if_needed(run_id, task_token.clone())?;
481        // We only actually want to send commands back to the server if there are
482        // no more pending activations -- in other words the lang SDK has caught
483        // up on replay.
484        if !self.pending_activations.has_pending(run_id) {
485            // Since we're telling the server about a wft success, we can remove it from the
486            // last failed map (if it was present)
487            self.workflows_last_task_failed.remove(run_id);
488            self.server_gateway
489                .complete_workflow_task(task_token, push_result.server_commands)
490                .await
491                .map_err(|ts| {
492                    if ts.code() == tonic::Code::InvalidArgument
493                        && ts.message() == "UnhandledCommand"
494                    {
495                        CompleteWfError::UnhandledCommandWhenCompleting
496                    } else {
497                        ts.into()
498                    }
499                })?;
500        }
501        Ok(())
502    }
503
504    /// Handle a failed workflow completion
505    async fn wf_activation_failed(
506        &self,
507        task_token: Vec<u8>,
508        run_id: &str,
509        failure: workflow_completion::Failure,
510    ) -> Result<(), CompleteWfError> {
511        // Blow up any cached data associated with the workflow
512        self.evict_run(&task_token);
513
514        if !self.workflows_last_task_failed.contains(run_id) {
515            self.server_gateway
516                .fail_workflow_task(
517                    task_token,
518                    WorkflowTaskFailedCause::Unspecified,
519                    failure.failure.map(Into::into),
520                )
521                .await?;
522            self.workflows_last_task_failed.insert(run_id.to_owned());
523        }
524
525        Ok(())
526    }
527
528    /// Will create a new workflow manager if needed for the workflow activation, if not, it will
529    /// feed the existing manager the updated history we received from the server.
530    ///
531    /// Also updates [CoreSDK::workflow_task_tokens] and validates the
532    /// [PollWorkflowTaskQueueResponse]
533    ///
534    /// Returns the next workflow activation and the workflow's run id
535    fn instantiate_or_update_workflow(
536        &self,
537        poll_wf_resp: PollWorkflowTaskQueueResponse,
538    ) -> Result<Option<NextWfActivation>, PollWfError> {
539        match poll_wf_resp {
540            PollWorkflowTaskQueueResponse {
541                task_token,
542                workflow_execution: Some(workflow_execution),
543                history: Some(history),
544                ..
545            } => {
546                let run_id = workflow_execution.run_id.clone();
547                // Correlate task token w/ run ID
548                self.workflow_task_tokens.insert(task_token, run_id.clone());
549
550                match self
551                    .workflow_machines
552                    .create_or_update(&run_id, history, workflow_execution)
553                {
554                    Ok(activation) => Ok(activation),
555                    Err(source) => Err(PollWfError::WorkflowUpdateError { source, run_id }),
556                }
557            }
558            p => Err(PollWfError::BadPollResponseFromServer(p)),
559        }
560    }
561
562    /// Feed commands from the lang sdk into appropriate workflow manager which will iterate
563    /// the state machines and return commands ready to be sent to the server
564    fn push_lang_commands(
565        &self,
566        run_id: &str,
567        cmds: Vec<WFCommand>,
568    ) -> Result<PushCommandsResult, WorkflowUpdateError> {
569        self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds))
570    }
571
572    /// Wraps access to `self.workflow_machines.access`, properly passing in the current tracing
573    /// span to the wf machines thread.
574    fn access_wf_machine<F, Fout>(
575        &self,
576        run_id: &str,
577        mutator: F,
578    ) -> Result<Fout, WorkflowUpdateError>
579    where
580        F: FnOnce(&mut WorkflowManager) -> Result<Fout, WorkflowError> + Send + 'static,
581        Fout: Send + Debug + 'static,
582    {
583        let curspan = Span::current();
584        let mutator = move |wfm: &mut WorkflowManager| {
585            let _e = curspan.enter();
586            mutator(wfm)
587        };
588        self.workflow_machines
589            .access(run_id, mutator)
590            .map_err(|source| WorkflowUpdateError {
591                source,
592                run_id: run_id.to_owned(),
593            })
594    }
595
596    /// Wrap a future, making it return early with a shutdown error in the event the shutdown
597    /// flag has been set
598    async fn shutdownable_fut<FOut, FErr>(
599        &self,
600        wrap_this: impl Future<Output = Result<FOut, FErr>>,
601    ) -> Result<FOut, FErr>
602    where
603        FErr: From<ShutdownErr>,
604    {
605        let shutdownfut = async {
606            loop {
607                self.shutdown_notify.notified().await;
608                if self.shutdown_requested.load(Ordering::SeqCst) {
609                    break;
610                }
611            }
612        };
613        tokio::select! {
614            _ = shutdownfut => {
615                Err(ShutdownErr.into())
616            }
617            r = wrap_this => r
618        }
619    }
620
621    /// Check if the machine needs another activation and queue it up if there is one
622    fn enqueue_next_activation_if_needed(
623        &self,
624        run_id: &str,
625        task_token: Vec<u8>,
626    ) -> Result<(), CompleteWfError> {
627        if let Some(next_activation) =
628            self.access_wf_machine(run_id, move |mgr| mgr.get_next_activation())?
629        {
630            self.pending_activations
631                .push(self.finalize_next_activation(next_activation, task_token));
632        }
633        self.workflow_task_complete_notify.notify_one();
634        Ok(())
635    }
636}
637
638#[cfg(test)]
639mod test {
640    use super::*;
641    use crate::machines::test_help::fake_sg_opts;
642    use crate::protos::temporal::api::workflowservice::v1::{
643        PollActivityTaskQueueResponse, RespondActivityTaskCompletedResponse,
644    };
645    use crate::{
646        machines::test_help::{
647            build_fake_core, gen_assert_and_fail, gen_assert_and_reply, poll_and_reply,
648            EvictionMode, FakeCore, TestHistoryBuilder,
649        },
650        machines::test_help::{build_mock_sg, fake_core_from_mock_sg, hist_to_poll_resp},
651        pollers::MockServerGatewayApis,
652        protos::{
653            coresdk::{
654                activity_result::ActivityResult,
655                common::UserCodeFailure,
656                workflow_activation::{
657                    wf_activation_job, FireTimer, ResolveActivity, StartWorkflow, UpdateRandomSeed,
658                    WfActivationJob,
659                },
660                workflow_commands::{
661                    ActivityCancellationType, CancelTimer, CompleteWorkflowExecution,
662                    FailWorkflowExecution, RequestCancelActivity, ScheduleActivity, StartTimer,
663                },
664            },
665            temporal::api::{
666                enums::v1::EventType,
667                workflowservice::v1::{
668                    RespondWorkflowTaskCompletedResponse, RespondWorkflowTaskFailedResponse,
669                },
670            },
671        },
672        test_help::canned_histories,
673    };
674    use rstest::{fixture, rstest};
675    use std::{collections::VecDeque, sync::atomic::AtomicU64, sync::atomic::AtomicUsize};
676
677    #[fixture(hist_batches = &[])]
678    fn single_timer_setup(hist_batches: &[usize]) -> FakeCore {
679        let wfid = "fake_wf_id";
680
681        let mut t = canned_histories::single_timer("fake_timer");
682        build_fake_core(wfid, &mut t, hist_batches)
683    }
684
685    #[fixture(hist_batches = &[])]
686    fn single_activity_setup(hist_batches: &[usize]) -> FakeCore {
687        let wfid = "fake_wf_id";
688
689        let mut t = canned_histories::single_activity("fake_activity");
690        build_fake_core(wfid, &mut t, hist_batches)
691    }
692
693    #[fixture(hist_batches = &[])]
694    fn single_activity_failure_setup(hist_batches: &[usize]) -> FakeCore {
695        let wfid = "fake_wf_id";
696
697        let mut t = canned_histories::single_failed_activity("fake_activity");
698        build_fake_core(wfid, &mut t, hist_batches)
699    }
700
701    #[rstest]
702    #[case::incremental(single_timer_setup(&[1, 2]), EvictionMode::NotSticky)]
703    #[case::replay(single_timer_setup(&[2]), EvictionMode::NotSticky)]
704    #[case::incremental_evict(single_timer_setup(&[1, 2]), EvictionMode::AfterEveryReply)]
705    #[case::replay_evict(single_timer_setup(&[2, 2]), EvictionMode::AfterEveryReply)]
706    #[tokio::test]
707    async fn single_timer_test_across_wf_bridge(
708        #[case] core: FakeCore,
709        #[case] evict: EvictionMode,
710    ) {
711        poll_and_reply(
712            &core,
713            evict,
714            &[
715                gen_assert_and_reply(
716                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
717                    vec![StartTimer {
718                        timer_id: "fake_timer".to_string(),
719                        ..Default::default()
720                    }
721                    .into()],
722                ),
723                gen_assert_and_reply(
724                    &job_assert!(wf_activation_job::Variant::FireTimer(_)),
725                    vec![CompleteWorkflowExecution { result: None }.into()],
726                ),
727            ],
728        )
729        .await;
730    }
731
732    #[rstest(core,
733        case::incremental(single_activity_setup(&[1, 2])),
734        case::incremental_activity_failure(single_activity_failure_setup(&[1, 2])),
735        case::replay(single_activity_setup(&[2])),
736        case::replay_activity_failure(single_activity_failure_setup(&[2]))
737    )]
738    #[tokio::test]
739    async fn single_activity_completion(core: FakeCore) {
740        poll_and_reply(
741            &core,
742            EvictionMode::NotSticky,
743            &[
744                gen_assert_and_reply(
745                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
746                    vec![ScheduleActivity {
747                        activity_id: "fake_activity".to_string(),
748                        ..Default::default()
749                    }
750                    .into()],
751                ),
752                gen_assert_and_reply(
753                    &job_assert!(wf_activation_job::Variant::ResolveActivity(_)),
754                    vec![CompleteWorkflowExecution { result: None }.into()],
755                ),
756            ],
757        )
758        .await;
759    }
760
761    #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
762    #[tokio::test]
763    async fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) {
764        let wfid = "fake_wf_id";
765        let timer_1_id = "timer1";
766        let timer_2_id = "timer2";
767
768        let mut t = canned_histories::parallel_timer(timer_1_id, timer_2_id);
769        let core = build_fake_core(wfid, &mut t, hist_batches);
770
771        poll_and_reply(
772            &core,
773            EvictionMode::NotSticky,
774            &[
775                gen_assert_and_reply(
776                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
777                    vec![
778                        StartTimer {
779                            timer_id: timer_1_id.to_string(),
780                            ..Default::default()
781                        }
782                        .into(),
783                        StartTimer {
784                            timer_id: timer_2_id.to_string(),
785                            ..Default::default()
786                        }
787                        .into(),
788                    ],
789                ),
790                gen_assert_and_reply(
791                    &|res| {
792                        assert_matches!(
793                            res.jobs.as_slice(),
794                            [
795                                WfActivationJob {
796                                    variant: Some(wf_activation_job::Variant::FireTimer(
797                                        FireTimer { timer_id: t1_id }
798                                    )),
799                                },
800                                WfActivationJob {
801                                    variant: Some(wf_activation_job::Variant::FireTimer(
802                                        FireTimer { timer_id: t2_id }
803                                    )),
804                                }
805                            ] => {
806                                assert_eq!(t1_id, &timer_1_id);
807                                assert_eq!(t2_id, &timer_2_id);
808                            }
809                        );
810                    },
811                    vec![CompleteWorkflowExecution { result: None }.into()],
812                ),
813            ],
814        )
815        .await;
816    }
817
818    #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
819    #[tokio::test]
820    async fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) {
821        let wfid = "fake_wf_id";
822        let timer_id = "wait_timer";
823        let cancel_timer_id = "cancel_timer";
824
825        let mut t = canned_histories::cancel_timer(timer_id, cancel_timer_id);
826        let core = build_fake_core(wfid, &mut t, hist_batches);
827
828        poll_and_reply(
829            &core,
830            EvictionMode::NotSticky,
831            &[
832                gen_assert_and_reply(
833                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
834                    vec![
835                        StartTimer {
836                            timer_id: cancel_timer_id.to_string(),
837                            ..Default::default()
838                        }
839                        .into(),
840                        StartTimer {
841                            timer_id: timer_id.to_string(),
842                            ..Default::default()
843                        }
844                        .into(),
845                    ],
846                ),
847                gen_assert_and_reply(
848                    &job_assert!(wf_activation_job::Variant::FireTimer(_)),
849                    vec![
850                        CancelTimer {
851                            timer_id: cancel_timer_id.to_string(),
852                        }
853                        .into(),
854                        CompleteWorkflowExecution { result: None }.into(),
855                    ],
856                ),
857            ],
858        )
859        .await;
860    }
861
862    #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
863    #[tokio::test]
864    async fn scheduled_activity_cancellation_try_cancel(hist_batches: &[usize]) {
865        let wfid = "fake_wf_id";
866        let activity_id = "fake_activity";
867        let signal_id = "signal";
868
869        let mut t = canned_histories::cancel_scheduled_activity(activity_id, signal_id);
870        let core = build_fake_core(wfid, &mut t, hist_batches);
871
872        poll_and_reply(
873            &core,
874            EvictionMode::NotSticky,
875            &[
876                gen_assert_and_reply(
877                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
878                    vec![ScheduleActivity {
879                        activity_id: activity_id.to_string(),
880                        cancellation_type: ActivityCancellationType::TryCancel as i32,
881                        ..Default::default()
882                    }
883                    .into()],
884                ),
885                gen_assert_and_reply(
886                    &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
887                    vec![RequestCancelActivity {
888                        activity_id: activity_id.to_string(),
889                        ..Default::default()
890                    }
891                    .into()],
892                ),
893                // Activity is getting resolved right away as we are in the TryCancel mode.
894                gen_assert_and_reply(
895                    &job_assert!(wf_activation_job::Variant::ResolveActivity(_)),
896                    vec![CompleteWorkflowExecution { result: None }.into()],
897                ),
898            ],
899        )
900        .await;
901    }
902
903    #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
904    #[tokio::test]
905    async fn scheduled_activity_timeout(hist_batches: &[usize]) {
906        let wfid = "fake_wf_id";
907        let activity_id = "fake_activity";
908
909        let mut t = canned_histories::scheduled_activity_timeout(activity_id);
910        let core = build_fake_core(wfid, &mut t, hist_batches);
911        poll_and_reply(
912            &core,
913            EvictionMode::NotSticky,
914            &[
915                gen_assert_and_reply(
916                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
917                    vec![ScheduleActivity {
918                        activity_id: activity_id.to_string(),
919                        ..Default::default()
920                    }
921                    .into()],
922                ),
923                // Activity is getting resolved right away as it has been timed out.
924                gen_assert_and_reply(
925                    &|res| {
926                        assert_matches!(
927                                res.jobs.as_slice(),
928                                [
929                                    WfActivationJob {
930                                        variant: Some(wf_activation_job::Variant::ResolveActivity(
931                                            ResolveActivity {
932                                                activity_id: aid,
933                                                result: Some(ActivityResult {
934                                                    status: Some(activity_result::Status::Failed(ar::Failure {
935                                                        failure: Some(failure)
936                                                    })),
937                                                })
938                                            }
939                                        )),
940                                    }
941                                ] => {
942                                    assert_eq!(failure.message, "Activity task timed out".to_string());
943                                    assert_eq!(aid, &activity_id.to_string());
944                                }
945                            );
946                    },
947                    vec![CompleteWorkflowExecution { result: None }.into()],
948                ),
949            ],
950        )
951        .await;
952    }
953
954    #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
955    #[tokio::test]
956    async fn started_activity_timeout(hist_batches: &[usize]) {
957        let wfid = "fake_wf_id";
958        let activity_id = "fake_activity";
959
960        let mut t = canned_histories::started_activity_timeout(activity_id);
961        let core = build_fake_core(wfid, &mut t, hist_batches);
962
963        poll_and_reply(
964            &core,
965            EvictionMode::NotSticky,
966            &[
967                gen_assert_and_reply(
968                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
969                    vec![ScheduleActivity {
970                        activity_id: activity_id.to_string(),
971                        ..Default::default()
972                    }
973                    .into()],
974                ),
975                // Activity is getting resolved right away as it has been timed out.
976                gen_assert_and_reply(
977                    &|res| {
978                        assert_matches!(
979                                res.jobs.as_slice(),
980                                [
981                                    WfActivationJob {
982                                        variant: Some(wf_activation_job::Variant::ResolveActivity(
983                                            ResolveActivity {
984                                                activity_id: aid,
985                                                result: Some(ActivityResult {
986                                                    status: Some(activity_result::Status::Failed(ar::Failure {
987                                                        failure: Some(failure)
988                                                    })),
989                                                })
990                                            }
991                                        )),
992                                    }
993                                ] => {
994                                    assert_eq!(failure.message, "Activity task timed out".to_string());
995                                    assert_eq!(aid, &activity_id.to_string());
996                                }
997                            );
998                    },
999                    vec![CompleteWorkflowExecution { result: None }.into()],
1000                ),
1001            ],
1002        )
1003        .await;
1004    }
1005
1006    #[rstest(hist_batches, case::incremental(&[1, 3]), case::replay(&[3]))]
1007    #[tokio::test]
1008    async fn cancelled_activity_timeout(hist_batches: &[usize]) {
1009        let wfid = "fake_wf_id";
1010        let activity_id = "fake_activity";
1011        let signal_id = "signal";
1012
1013        let mut t = canned_histories::scheduled_cancelled_activity_timeout(activity_id, signal_id);
1014        let core = build_fake_core(wfid, &mut t, hist_batches);
1015
1016        poll_and_reply(
1017            &core,
1018            EvictionMode::NotSticky,
1019            &[
1020                gen_assert_and_reply(
1021                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1022                    vec![ScheduleActivity {
1023                        activity_id: activity_id.to_string(),
1024                        ..Default::default()
1025                    }
1026                    .into()],
1027                ),
1028                gen_assert_and_reply(
1029                    &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1030                    vec![RequestCancelActivity {
1031                        activity_id: activity_id.to_string(),
1032                        ..Default::default()
1033                    }
1034                    .into()],
1035                ),
1036                // Activity is getting resolved right away as it has been timed out.
1037                gen_assert_and_reply(
1038                    &job_assert!(wf_activation_job::Variant::ResolveActivity(
1039                        ResolveActivity {
1040                            activity_id: _,
1041                            result: Some(ActivityResult {
1042                                status: Some(activity_result::Status::Canceled(..)),
1043                            })
1044                        }
1045                    )),
1046                    vec![CompleteWorkflowExecution { result: None }.into()],
1047                ),
1048            ],
1049        )
1050        .await;
1051    }
1052
1053    #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
1054    #[tokio::test]
1055    async fn scheduled_activity_cancellation_abandon(hist_batches: &[usize]) {
1056        let wfid = "fake_wf_id";
1057        let activity_id = "fake_activity";
1058        let signal_id = "signal";
1059
1060        let mut t = canned_histories::cancel_scheduled_activity_abandon(activity_id, signal_id);
1061        let core = build_fake_core(wfid, &mut t, hist_batches);
1062
1063        verify_activity_cancellation_abandon(&activity_id, &core).await;
1064    }
1065
1066    #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
1067    #[tokio::test]
1068    async fn started_activity_cancellation_abandon(hist_batches: &[usize]) {
1069        let wfid = "fake_wf_id";
1070        let activity_id = "fake_activity";
1071        let signal_id = "signal";
1072
1073        let mut t = canned_histories::cancel_started_activity_abandon(activity_id, signal_id);
1074        let core = build_fake_core(wfid, &mut t, hist_batches);
1075
1076        verify_activity_cancellation_abandon(&activity_id, &core).await;
1077    }
1078
1079    async fn verify_activity_cancellation_abandon(activity_id: &&str, core: &FakeCore) {
1080        poll_and_reply(
1081            &core,
1082            EvictionMode::NotSticky,
1083            &[
1084                gen_assert_and_reply(
1085                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1086                    vec![ScheduleActivity {
1087                        activity_id: activity_id.to_string(),
1088                        cancellation_type: ActivityCancellationType::Abandon as i32,
1089                        ..Default::default()
1090                    }
1091                    .into()],
1092                ),
1093                gen_assert_and_reply(
1094                    &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1095                    vec![RequestCancelActivity {
1096                        activity_id: activity_id.to_string(),
1097                        ..Default::default()
1098                    }
1099                    .into()],
1100                ),
1101                // Activity is getting resolved right away as we are in the Abandon mode.
1102                gen_assert_and_reply(
1103                    &job_assert!(wf_activation_job::Variant::ResolveActivity(
1104                        ResolveActivity {
1105                            activity_id: _,
1106                            result: Some(ActivityResult {
1107                                status: Some(activity_result::Status::Canceled(..)),
1108                            })
1109                        }
1110                    )),
1111                    vec![CompleteWorkflowExecution { result: None }.into()],
1112                ),
1113            ],
1114        )
1115        .await;
1116    }
1117
1118    #[rstest(hist_batches, case::incremental(&[1, 2, 3, 4]), case::replay(&[4]))]
1119    #[tokio::test]
1120    async fn scheduled_activity_cancellation_wait_for_cancellation(hist_batches: &[usize]) {
1121        let wfid = "fake_wf_id";
1122        let activity_id = "fake_activity";
1123        let signal_id = "signal";
1124
1125        let mut t =
1126            canned_histories::cancel_scheduled_activity_with_signal_and_activity_task_cancel(
1127                activity_id,
1128                signal_id,
1129            );
1130        let core = build_fake_core(wfid, &mut t, hist_batches);
1131
1132        verify_activity_cancellation_wait_for_cancellation(activity_id, &core).await;
1133    }
1134
1135    #[rstest(hist_batches, case::incremental(&[1, 2, 3, 4]), case::replay(&[4]))]
1136    #[tokio::test]
1137    async fn started_activity_cancellation_wait_for_cancellation(hist_batches: &[usize]) {
1138        let wfid = "fake_wf_id";
1139        let activity_id = "fake_activity";
1140        let signal_id = "signal";
1141
1142        let mut t = canned_histories::cancel_started_activity_with_signal_and_activity_task_cancel(
1143            activity_id,
1144            signal_id,
1145        );
1146        let core = build_fake_core(wfid, &mut t, hist_batches);
1147
1148        verify_activity_cancellation_wait_for_cancellation(activity_id, &core).await;
1149    }
1150
1151    async fn verify_activity_cancellation_wait_for_cancellation(
1152        activity_id: &str,
1153        core: &FakeCore,
1154    ) {
1155        poll_and_reply(
1156            &core,
1157            EvictionMode::NotSticky,
1158            &[
1159                gen_assert_and_reply(
1160                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1161                    vec![ScheduleActivity {
1162                        activity_id: activity_id.to_string(),
1163                        cancellation_type: ActivityCancellationType::WaitCancellationCompleted
1164                            as i32,
1165                        ..Default::default()
1166                    }
1167                    .into()],
1168                ),
1169                gen_assert_and_reply(
1170                    &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1171                    vec![RequestCancelActivity {
1172                        activity_id: activity_id.to_string(),
1173                        ..Default::default()
1174                    }
1175                    .into()],
1176                ),
1177                // Making sure that activity is not resolved until it's cancelled.
1178                gen_assert_and_reply(
1179                    &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1180                    vec![],
1181                ),
1182                // Now ActivityTaskCanceled has been processed and activity can be resolved.
1183                gen_assert_and_reply(
1184                    &job_assert!(wf_activation_job::Variant::ResolveActivity(
1185                        ResolveActivity {
1186                            activity_id: _,
1187                            result: Some(ActivityResult {
1188                                status: Some(activity_result::Status::Canceled(..)),
1189                            })
1190                        }
1191                    )),
1192                    vec![CompleteWorkflowExecution { result: None }.into()],
1193                ),
1194            ],
1195        )
1196        .await;
1197    }
1198
1199    #[rstest(hist_batches, case::incremental(&[1, 3]), case::replay(&[3]))]
1200    #[tokio::test]
1201    async fn scheduled_activity_cancellation_try_cancel_task_canceled(hist_batches: &[usize]) {
1202        let wfid = "fake_wf_id";
1203        let activity_id = "fake_activity";
1204        let signal_id = "signal";
1205
1206        let mut t = canned_histories::cancel_scheduled_activity_with_activity_task_cancel(
1207            activity_id,
1208            signal_id,
1209        );
1210        let core = build_fake_core(wfid, &mut t, hist_batches);
1211
1212        verify_activity_cancellation_try_cancel_task_canceled(&activity_id, &core).await;
1213    }
1214
1215    #[rstest(hist_batches, case::incremental(&[1, 3]), case::replay(&[3]))]
1216    #[tokio::test]
1217    async fn started_activity_cancellation_try_cancel_task_canceled(hist_batches: &[usize]) {
1218        let wfid = "fake_wf_id";
1219        let activity_id = "fake_activity";
1220        let signal_id = "signal";
1221
1222        let mut t = canned_histories::cancel_started_activity_with_activity_task_cancel(
1223            activity_id,
1224            signal_id,
1225        );
1226        let core = build_fake_core(wfid, &mut t, hist_batches);
1227
1228        verify_activity_cancellation_try_cancel_task_canceled(&activity_id, &core).await;
1229    }
1230
1231    async fn verify_activity_cancellation_try_cancel_task_canceled(
1232        activity_id: &&str,
1233        core: &FakeCore,
1234    ) {
1235        poll_and_reply(
1236            &core,
1237            EvictionMode::NotSticky,
1238            &[
1239                gen_assert_and_reply(
1240                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1241                    vec![ScheduleActivity {
1242                        activity_id: activity_id.to_string(),
1243                        cancellation_type: ActivityCancellationType::TryCancel as i32,
1244                        ..Default::default()
1245                    }
1246                    .into()],
1247                ),
1248                gen_assert_and_reply(
1249                    &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
1250                    vec![RequestCancelActivity {
1251                        activity_id: activity_id.to_string(),
1252                        ..Default::default()
1253                    }
1254                    .into()],
1255                ),
1256                // Making sure that activity is not resolved until it's cancelled.
1257                gen_assert_and_reply(
1258                    &job_assert!(wf_activation_job::Variant::ResolveActivity(
1259                        ResolveActivity {
1260                            activity_id: _,
1261                            result: Some(ActivityResult {
1262                                status: Some(activity_result::Status::Canceled(..)),
1263                            })
1264                        }
1265                    )),
1266                    vec![CompleteWorkflowExecution { result: None }.into()],
1267                ),
1268            ],
1269        )
1270        .await;
1271    }
1272
1273    #[rstest(single_timer_setup(&[1]))]
1274    #[tokio::test]
1275    async fn after_shutdown_server_is_not_polled(single_timer_setup: FakeCore) {
1276        let res = single_timer_setup.inner.poll_workflow_task().await.unwrap();
1277        assert_eq!(res.jobs.len(), 1);
1278
1279        single_timer_setup.inner.shutdown().await;
1280        assert_matches!(
1281            single_timer_setup
1282                .inner
1283                .poll_workflow_task()
1284                .await
1285                .unwrap_err(),
1286            PollWfError::ShutDown
1287        );
1288    }
1289
1290    #[tokio::test]
1291    async fn workflow_update_random_seed_on_workflow_reset() {
1292        let wfid = "fake_wf_id";
1293        let new_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156";
1294        let timer_1_id = "timer1";
1295        let randomness_seed_from_start = AtomicU64::new(0);
1296
1297        let mut t = canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, new_run_id);
1298        let core = build_fake_core(wfid, &mut t, &[2]);
1299
1300        poll_and_reply(
1301            &core,
1302            EvictionMode::NotSticky,
1303            &[
1304                gen_assert_and_reply(
1305                    &|res| {
1306                        assert_matches!(
1307                            res.jobs.as_slice(),
1308                            [WfActivationJob {
1309                                variant: Some(wf_activation_job::Variant::StartWorkflow(
1310                                StartWorkflow{randomness_seed, ..}
1311                                )),
1312                            }] => {
1313                            randomness_seed_from_start.store(*randomness_seed, Ordering::SeqCst);
1314                            }
1315                        );
1316                    },
1317                    vec![StartTimer {
1318                        timer_id: timer_1_id.to_string(),
1319                        ..Default::default()
1320                    }
1321                    .into()],
1322                ),
1323                gen_assert_and_reply(
1324                    &|res| {
1325                        assert_matches!(
1326                            res.jobs.as_slice(),
1327                            [WfActivationJob {
1328                                variant: Some(wf_activation_job::Variant::FireTimer(_),),
1329                            },
1330                            WfActivationJob {
1331                                variant: Some(wf_activation_job::Variant::UpdateRandomSeed(
1332                                    UpdateRandomSeed{randomness_seed})),
1333                            }] => {
1334                                assert_ne!(randomness_seed_from_start.load(Ordering::SeqCst),
1335                                          *randomness_seed)
1336                            }
1337                        )
1338                    },
1339                    vec![CompleteWorkflowExecution { result: None }.into()],
1340                ),
1341            ],
1342        )
1343        .await;
1344    }
1345
1346    #[tokio::test]
1347    async fn cancel_timer_before_sent_wf_bridge() {
1348        let wfid = "fake_wf_id";
1349        let cancel_timer_id = "cancel_timer";
1350
1351        let mut t = TestHistoryBuilder::default();
1352        t.add_by_type(EventType::WorkflowExecutionStarted);
1353        t.add_full_wf_task();
1354        t.add_workflow_execution_completed();
1355
1356        let core = build_fake_core(wfid, &mut t, &[1]);
1357
1358        poll_and_reply(
1359            &core,
1360            EvictionMode::NotSticky,
1361            &[gen_assert_and_reply(
1362                &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1363                vec![
1364                    StartTimer {
1365                        timer_id: cancel_timer_id.to_string(),
1366                        ..Default::default()
1367                    }
1368                    .into(),
1369                    CancelTimer {
1370                        timer_id: cancel_timer_id.to_string(),
1371                    }
1372                    .into(),
1373                    CompleteWorkflowExecution { result: None }.into(),
1374                ],
1375            )],
1376        )
1377        .await;
1378    }
1379
1380    #[rstest]
1381    #[case::no_evict_inc(&[1, 2, 2], EvictionMode::NotSticky)]
1382    #[case::no_evict(&[2, 2], EvictionMode::NotSticky)]
1383    #[case::evict(&[1, 2, 2, 2], EvictionMode::AfterEveryReply)]
1384    #[tokio::test]
1385    async fn complete_activation_with_failure(
1386        #[case] batches: &[usize],
1387        #[case] evict: EvictionMode,
1388    ) {
1389        let wfid = "fake_wf_id";
1390        let timer_id = "timer";
1391
1392        let mut t = canned_histories::workflow_fails_with_failure_after_timer(timer_id);
1393        let mut mock_sg = build_mock_sg(wfid, &mut t, batches);
1394        // Need to create an expectation that we will call a failure completion
1395        mock_sg
1396            .expect_fail_workflow_task()
1397            .times(1)
1398            .returning(|_, _, _| Ok(RespondWorkflowTaskFailedResponse {}));
1399        let core = fake_core_from_mock_sg(mock_sg, batches);
1400
1401        poll_and_reply(
1402            &core,
1403            evict,
1404            &[
1405                gen_assert_and_reply(
1406                    &|_| {},
1407                    vec![StartTimer {
1408                        timer_id: timer_id.to_owned(),
1409                        ..Default::default()
1410                    }
1411                    .into()],
1412                ),
1413                gen_assert_and_fail(&|_| {}),
1414                gen_assert_and_reply(
1415                    &job_assert!(wf_activation_job::Variant::FireTimer(_)),
1416                    vec![CompleteWorkflowExecution { result: None }.into()],
1417                ),
1418            ],
1419        )
1420        .await;
1421    }
1422
1423    #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
1424    #[tokio::test]
1425    async fn simple_timer_fail_wf_execution(hist_batches: &[usize]) {
1426        let wfid = "fake_wf_id";
1427        let timer_id = "timer1";
1428
1429        let mut t = canned_histories::single_timer(timer_id);
1430        let core = build_fake_core(wfid, &mut t, hist_batches);
1431
1432        poll_and_reply(
1433            &core,
1434            EvictionMode::NotSticky,
1435            &[
1436                gen_assert_and_reply(
1437                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1438                    vec![StartTimer {
1439                        timer_id: timer_id.to_string(),
1440                        ..Default::default()
1441                    }
1442                    .into()],
1443                ),
1444                gen_assert_and_reply(
1445                    &job_assert!(wf_activation_job::Variant::FireTimer(_)),
1446                    vec![FailWorkflowExecution {
1447                        failure: Some(UserCodeFailure {
1448                            message: "I'm ded".to_string(),
1449                            ..Default::default()
1450                        }),
1451                    }
1452                    .into()],
1453                ),
1454            ],
1455        )
1456        .await;
1457    }
1458
1459    #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
1460    #[tokio::test]
1461    async fn two_signals(hist_batches: &[usize]) {
1462        let wfid = "fake_wf_id";
1463
1464        let mut t = canned_histories::two_signals("sig1", "sig2");
1465        let core = build_fake_core(wfid, &mut t, hist_batches);
1466
1467        poll_and_reply(
1468            &core,
1469            EvictionMode::NotSticky,
1470            &[
1471                gen_assert_and_reply(
1472                    &job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
1473                    // Task is completed with no commands
1474                    vec![],
1475                ),
1476                gen_assert_and_reply(
1477                    &job_assert!(
1478                        wf_activation_job::Variant::SignalWorkflow(_),
1479                        wf_activation_job::Variant::SignalWorkflow(_)
1480                    ),
1481                    vec![],
1482                ),
1483            ],
1484        )
1485        .await;
1486    }
1487
1488    #[tokio::test]
1489    async fn workflow_failures_only_reported_once() {
1490        let wfid = "fake_wf_id";
1491        let timer_1 = "timer1";
1492        let timer_2 = "timer2";
1493
1494        let mut t =
1495            canned_histories::workflow_fails_with_failure_two_different_points(timer_1, timer_2);
1496        let batches = &[
1497            1, 2, // Start then first good reply
1498            2, 2, 2, // Poll for every failure
1499            // Poll again after evicting after second good reply, then two more fails
1500            3, 3, 3,
1501        ];
1502        let mut mock_sg = build_mock_sg(wfid, &mut t, batches);
1503        mock_sg
1504            .expect_fail_workflow_task()
1505            // We should only call the server to say we failed twice (once after each success)
1506            .times(2)
1507            .returning(|_, _, _| Ok(RespondWorkflowTaskFailedResponse {}));
1508        let core = fake_core_from_mock_sg(mock_sg, batches);
1509
1510        poll_and_reply(
1511            &core,
1512            EvictionMode::NotSticky,
1513            &[
1514                gen_assert_and_reply(
1515                    &|_| {},
1516                    vec![StartTimer {
1517                        timer_id: timer_1.to_owned(),
1518                        ..Default::default()
1519                    }
1520                    .into()],
1521                ),
1522                // Fail a few times in a row (only one of which should be reported)
1523                gen_assert_and_fail(&|_| {}),
1524                gen_assert_and_fail(&|_| {}),
1525                gen_assert_and_fail(&|_| {}),
1526                gen_assert_and_reply(
1527                    &job_assert!(wf_activation_job::Variant::FireTimer(_)),
1528                    vec![StartTimer {
1529                        timer_id: timer_2.to_string(),
1530                        ..Default::default()
1531                    }
1532                    .into()],
1533                ),
1534                // Again (a new fail should be reported here)
1535                gen_assert_and_fail(&|_| {}),
1536                gen_assert_and_fail(&|_| {}),
1537                gen_assert_and_reply(
1538                    &job_assert!(wf_activation_job::Variant::FireTimer(_)),
1539                    vec![CompleteWorkflowExecution { result: None }.into()],
1540                ),
1541            ],
1542        )
1543        .await;
1544    }
1545
1546    #[tokio::test]
1547    async fn max_concurrent_wft_respected() {
1548        // Create long histories for three workflows
1549        let mut t1 = canned_histories::long_sequential_timers(20);
1550        let mut t2 = canned_histories::long_sequential_timers(20);
1551        let mut tasks = VecDeque::from(vec![
1552            hist_to_poll_resp(&mut t1, "wf1", 100),
1553            hist_to_poll_resp(&mut t2, "wf2", 100),
1554        ]);
1555        // Limit the core to two outstanding workflow tasks, hence we should only see polling
1556        // happen twice, since we will not actually finish the two workflows
1557        let mut mock_gateway = MockServerGatewayApis::new();
1558        mock_gateway
1559            .expect_poll_workflow_task()
1560            .times(2)
1561            .returning(move || Ok(tasks.pop_front().unwrap()));
1562        // Response not really important here
1563        mock_gateway
1564            .expect_complete_workflow_task()
1565            .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default()));
1566
1567        let core = CoreSDK::new(
1568            mock_gateway,
1569            CoreInitOptions {
1570                gateway_opts: fake_sg_opts(),
1571                evict_after_pending_cleared: true,
1572                max_outstanding_workflow_tasks: 2,
1573                max_outstanding_activities: 1,
1574            },
1575        );
1576
1577        // Poll twice in a row before completing -- we should be at limit
1578        let r1 = core.poll_workflow_task().await.unwrap();
1579        let _r2 = core.poll_workflow_task().await.unwrap();
1580        // Now we immediately poll for new work, and complete one of the existing activations. The
1581        // poll must not unblock until the completion goes through.
1582        let last_finisher = AtomicUsize::new(0);
1583        let (_, mut r1) = tokio::join! {
1584            async {
1585                core.complete_workflow_task(WfActivationCompletion::from_status(
1586                    r1.task_token,
1587                    workflow_completion::Success::from_cmds(vec![StartTimer {
1588                        timer_id: "timer-1".to_string(),
1589                        ..Default::default()
1590                    }
1591                    .into()]).into()
1592                )).await.unwrap();
1593                last_finisher.store(1, Ordering::SeqCst);
1594            },
1595            async {
1596                let r = core.poll_workflow_task().await.unwrap();
1597                last_finisher.store(2, Ordering::SeqCst);
1598                r
1599            }
1600        };
1601        // So that we know we blocked
1602        assert_eq!(last_finisher.load(Ordering::Acquire), 2);
1603
1604        // Since we never did anything with r2, all subsequent activations should be for wf1
1605        for i in 2..19 {
1606            core.complete_workflow_task(WfActivationCompletion::from_status(
1607                r1.task_token,
1608                workflow_completion::Success::from_cmds(vec![StartTimer {
1609                    timer_id: format!("timer-{}", i),
1610                    ..Default::default()
1611                }
1612                .into()])
1613                .into(),
1614            ))
1615            .await
1616            .unwrap();
1617            r1 = core.poll_workflow_task().await.unwrap();
1618        }
1619    }
1620
1621    #[tokio::test]
1622    async fn max_activites_respected() {
1623        let mut tasks = VecDeque::from(vec![
1624            PollActivityTaskQueueResponse {
1625                task_token: vec![1],
1626                activity_id: "act1".to_string(),
1627                ..Default::default()
1628            },
1629            PollActivityTaskQueueResponse {
1630                task_token: vec![2],
1631                activity_id: "act2".to_string(),
1632                ..Default::default()
1633            },
1634            PollActivityTaskQueueResponse {
1635                task_token: vec![3],
1636                activity_id: "act3".to_string(),
1637                ..Default::default()
1638            },
1639        ]);
1640        let mut mock_gateway = MockServerGatewayApis::new();
1641        mock_gateway
1642            .expect_poll_activity_task()
1643            .times(3)
1644            .returning(move || Ok(tasks.pop_front().unwrap()));
1645        mock_gateway
1646            .expect_complete_activity_task()
1647            .returning(|_, _| Ok(RespondActivityTaskCompletedResponse::default()));
1648
1649        let core = CoreSDK::new(
1650            mock_gateway,
1651            CoreInitOptions {
1652                gateway_opts: fake_sg_opts(),
1653                evict_after_pending_cleared: true,
1654                max_outstanding_workflow_tasks: 1,
1655                max_outstanding_activities: 2,
1656            },
1657        );
1658
1659        // We allow two outstanding activities, therefore first two polls should return right away
1660        let r1 = core.poll_activity_task().await.unwrap();
1661        let _r2 = core.poll_activity_task().await.unwrap();
1662        // Third should block until we complete one of the first two
1663        let last_finisher = AtomicUsize::new(0);
1664        tokio::join! {
1665            async {
1666                core.complete_activity_task(ActivityTaskCompletion {
1667                    task_token: r1.task_token,
1668                    result: Some(ActivityResult::ok(vec![1].into()))
1669                }).await.unwrap();
1670                last_finisher.store(1, Ordering::SeqCst);
1671            },
1672            async {
1673                core.poll_activity_task().await.unwrap();
1674                last_finisher.store(2, Ordering::SeqCst);
1675            }
1676        };
1677        // So that we know we blocked
1678        assert_eq!(last_finisher.load(Ordering::Acquire), 2);
1679    }
1680}