1#![warn(missing_docs)] #[macro_use]
71extern crate tracing;
72extern crate self as temporalio_sdk;
73
74pub mod activities;
75pub mod interceptors;
76mod workflow_context;
77mod workflow_future;
78pub mod workflows;
79
80use workflow_future::WorkflowFunction;
81
82pub use temporalio_client::Namespace;
83pub use workflow_context::{
84 ActivityExecutionError, ActivityOptions, BaseWorkflowContext, CancellableFuture, ChildWorkflow,
85 ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo,
86 PendingChildWorkflow, RootWorkflowInfo, Signal, SignalData, SignalWorkflowOptions,
87 StartedChildWorkflow, SyncWorkflowContext, TimerOptions, WorkflowContext, WorkflowContextView,
88};
89
90use crate::{
91 activities::{
92 ActivityContext, ActivityDefinitions, ActivityError, ActivityImplementer,
93 ExecutableActivity,
94 },
95 interceptors::WorkerInterceptor,
96 workflow_context::{ChildWfCommon, NexusUnblockData, StartedNexusOperation},
97 workflows::{WorkflowDefinitions, WorkflowImplementation, WorkflowImplementer},
98};
99use anyhow::{Context, anyhow, bail};
100use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
101use std::{
102 any::{Any, TypeId},
103 cell::RefCell,
104 collections::{HashMap, HashSet},
105 fmt::{Debug, Display, Formatter},
106 future::Future,
107 panic::AssertUnwindSafe,
108 sync::Arc,
109 time::Duration,
110};
111use temporalio_client::{Client, NamespacedClient};
112use temporalio_common::{
113 ActivityDefinition, WorkflowDefinition,
114 data_converters::{DataConverter, SerializationContextData},
115 payload_visitor::{decode_payloads, encode_payloads},
116 protos::{
117 TaskToken,
118 coresdk::{
119 ActivityTaskCompletion, AsJsonPayloadExt,
120 activity_result::{ActivityExecutionResult, ActivityResolution},
121 activity_task::{ActivityTask, activity_task},
122 child_workflow::ChildWorkflowResult,
123 common::NamespacedWorkflowExecution,
124 nexus::NexusOperationResult,
125 workflow_activation::{
126 WorkflowActivation,
127 resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
128 resolve_nexus_operation_start, workflow_activation_job::Variant,
129 },
130 workflow_commands::{
131 ContinueAsNewWorkflowExecution, WorkflowCommand, workflow_command,
132 },
133 workflow_completion::WorkflowActivationCompletion,
134 },
135 temporal::api::{
136 common::v1::Payload,
137 enums::v1::WorkflowTaskFailedCause,
138 failure::v1::{Failure, failure},
139 },
140 },
141 worker::{WorkerDeploymentOptions, WorkerTaskTypes, build_id_from_current_exe},
142};
143use temporalio_sdk_core::{
144 CoreRuntime, PollError, PollerBehavior, TunerBuilder, Worker as CoreWorker, WorkerConfig,
145 WorkerTuner, WorkerVersioningStrategy, WorkflowErrorType, init_worker,
146};
147use tokio::{
148 sync::{
149 Notify,
150 mpsc::{UnboundedSender, unbounded_channel},
151 oneshot,
152 },
153 task::JoinError,
154};
155use tokio_stream::wrappers::UnboundedReceiverStream;
156use tokio_util::sync::CancellationToken;
157use tracing::{Instrument, Span, field};
158use uuid::Uuid;
159
160#[derive(bon::Builder, Clone)]
162#[builder(start_fn = new, on(String, into), state_mod(vis = "pub"))]
163#[non_exhaustive]
164pub struct WorkerOptions {
165 #[builder(start_fn)]
168 pub task_queue: String,
169
170 #[builder(field)]
171 activities: ActivityDefinitions,
172
173 #[builder(field)]
174 workflows: WorkflowDefinitions,
175
176 #[builder(default = def_build_id())]
179 pub deployment_options: WorkerDeploymentOptions,
180 pub client_identity_override: Option<String>,
184 #[builder(default = 1000)]
190 pub max_cached_workflows: usize,
191 #[builder(default = Arc::new(TunerBuilder::default().build()))]
194 pub tuner: Arc<dyn WorkerTuner + Send + Sync>,
195 #[builder(default = PollerBehavior::SimpleMaximum(5))]
199 pub workflow_task_poller_behavior: PollerBehavior,
200 #[builder(default = 0.2)]
208 pub nonsticky_to_sticky_poll_ratio: f32,
209 #[builder(default = PollerBehavior::SimpleMaximum(5))]
211 pub activity_task_poller_behavior: PollerBehavior,
212 #[builder(default = PollerBehavior::SimpleMaximum(5))]
214 pub nexus_task_poller_behavior: PollerBehavior,
215 #[builder(default = WorkerTaskTypes::all())]
221 pub task_types: WorkerTaskTypes,
222 #[builder(default = Duration::from_secs(10))]
225 pub sticky_queue_schedule_to_start_timeout: Duration,
226 #[builder(default = Duration::from_secs(60))]
228 pub max_heartbeat_throttle_interval: Duration,
229 #[builder(default = Duration::from_secs(30))]
234 pub default_heartbeat_throttle_interval: Duration,
235 pub max_task_queue_activities_per_second: Option<f64>,
242 pub max_worker_activities_per_second: Option<f64>,
247 #[builder(default)]
250 pub workflow_failure_errors: HashSet<WorkflowErrorType>,
251 #[builder(default)]
254 pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
255 pub graceful_shutdown_period: Option<Duration>,
258}
259
260impl<S: worker_options_builder::State> WorkerOptionsBuilder<S> {
261 pub fn register_activities<AI: ActivityImplementer>(mut self, instance: AI) -> Self {
263 self.activities.register_activities::<AI>(instance);
264 self
265 }
266 pub fn register_activity<AD>(mut self, instance: Arc<AD::Implementer>) -> Self
268 where
269 AD: ActivityDefinition + ExecutableActivity,
270 AD::Output: Send + Sync,
271 {
272 self.activities.register_activity::<AD>(instance);
273 self
274 }
275
276 pub fn register_workflow<WI: WorkflowImplementer>(mut self) -> Self {
278 self.workflows.register_workflow::<WI>();
279 self
280 }
281
282 pub fn register_workflow_with_factory<W, F>(mut self, factory: F) -> Self
300 where
301 W: WorkflowImplementation,
302 <W::Run as WorkflowDefinition>::Input: Send,
303 F: Fn() -> W + Send + Sync + 'static,
304 {
305 self.workflows
306 .register_workflow_run_with_factory::<W, F>(factory);
307 self
308 }
309}
310
311fn def_build_id() -> WorkerDeploymentOptions {
313 WorkerDeploymentOptions::from_build_id(build_id_from_current_exe().to_owned())
314}
315
316impl WorkerOptions {
317 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
319 self.activities.register_activities::<AI>(instance);
320 self
321 }
322 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
324 where
325 AD: ActivityDefinition + ExecutableActivity,
326 AD::Output: Send + Sync,
327 {
328 self.activities.register_activity::<AD>(instance);
329 self
330 }
331 pub fn activities(&self) -> ActivityDefinitions {
333 self.activities.clone()
334 }
335
336 pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
338 self.workflows.register_workflow::<WI>();
339 self
340 }
341
342 pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
347 where
348 W: WorkflowImplementation,
349 <W::Run as WorkflowDefinition>::Input: Send,
350 F: Fn() -> W + Send + Sync + 'static,
351 {
352 self.workflows
353 .register_workflow_run_with_factory::<W, F>(factory);
354 self
355 }
356
357 pub fn workflows(&self) -> WorkflowDefinitions {
359 self.workflows.clone()
360 }
361
362 #[doc(hidden)]
363 pub fn to_core_options(&self, namespace: String) -> Result<WorkerConfig, String> {
364 WorkerConfig::builder()
365 .namespace(namespace)
366 .task_queue(self.task_queue.clone())
367 .maybe_client_identity_override(self.client_identity_override.clone())
368 .max_cached_workflows(self.max_cached_workflows)
369 .tuner(self.tuner.clone())
370 .workflow_task_poller_behavior(self.workflow_task_poller_behavior)
371 .activity_task_poller_behavior(self.activity_task_poller_behavior)
372 .nexus_task_poller_behavior(self.nexus_task_poller_behavior)
373 .task_types(self.task_types)
374 .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
375 .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
376 .default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
377 .maybe_max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
378 .maybe_max_worker_activities_per_second(self.max_worker_activities_per_second)
379 .maybe_graceful_shutdown_period(self.graceful_shutdown_period)
380 .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
381 self.deployment_options.clone(),
382 ))
383 .workflow_failure_errors(self.workflow_failure_errors.clone())
384 .workflow_types_to_failure_errors(self.workflow_types_to_failure_errors.clone())
385 .build()
386 }
387}
388
389pub struct Worker {
393 common: CommonWorker,
394 workflow_half: WorkflowHalf,
395 activity_half: ActivityHalf,
396}
397
398struct CommonWorker {
399 worker: Arc<CoreWorker>,
400 task_queue: String,
401 worker_interceptor: Option<Box<dyn WorkerInterceptor>>,
402 data_converter: DataConverter,
403}
404
405#[derive(Default)]
406struct WorkflowHalf {
407 workflows: RefCell<HashMap<String, WorkflowData>>,
409 workflow_definitions: WorkflowDefinitions,
410 workflow_removed_from_map: Notify,
411}
412struct WorkflowData {
413 activation_chan: UnboundedSender<WorkflowActivation>,
415}
416
417struct WorkflowFutureHandle<F: Future<Output = Result<WorkflowResult<Payload>, JoinError>>> {
418 join_handle: F,
419 run_id: String,
420}
421
422#[derive(Default)]
423struct ActivityHalf {
424 activities: ActivityDefinitions,
426 task_tokens_to_cancels: HashMap<TaskToken, CancellationToken>,
427}
428
429impl Worker {
430 pub fn new(
432 runtime: &CoreRuntime,
433 client: Client,
434 mut options: WorkerOptions,
435 ) -> Result<Self, Box<dyn std::error::Error>> {
436 let acts = std::mem::take(&mut options.activities);
437 let wfs = std::mem::take(&mut options.workflows);
438 let wc = options
439 .to_core_options(client.namespace())
440 .map_err(|s| anyhow::anyhow!("{s}"))?;
441 let core = init_worker(runtime, wc, client.connection().clone())?;
442 let mut me = Self::new_from_core_definitions(
443 Arc::new(core),
444 client.data_converter().clone(),
445 Default::default(),
446 Default::default(),
447 );
448 me.activity_half.activities = acts;
449 me.workflow_half.workflow_definitions = wfs;
450 Ok(me)
451 }
452
453 #[doc(hidden)]
455 pub fn new_from_core(worker: Arc<CoreWorker>, data_converter: DataConverter) -> Self {
456 Self::new_from_core_definitions(
457 worker,
458 data_converter,
459 Default::default(),
460 Default::default(),
461 )
462 }
463
464 #[doc(hidden)]
466 pub fn new_from_core_definitions(
467 worker: Arc<CoreWorker>,
468 data_converter: DataConverter,
469 activities: ActivityDefinitions,
470 workflows: WorkflowDefinitions,
471 ) -> Self {
472 Self {
473 common: CommonWorker {
474 task_queue: worker.get_config().task_queue.clone(),
475 worker,
476 worker_interceptor: None,
477 data_converter,
478 },
479 workflow_half: WorkflowHalf {
480 workflow_definitions: workflows,
481 ..Default::default()
482 },
483 activity_half: ActivityHalf {
484 activities,
485 ..Default::default()
486 },
487 }
488 }
489
490 pub fn task_queue(&self) -> &str {
492 &self.common.task_queue
493 }
494
495 pub fn shutdown_handle(&self) -> impl Fn() + use<> {
498 let w = self.common.worker.clone();
499 move || w.initiate_shutdown()
500 }
501
502 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
504 self.activity_half
505 .activities
506 .register_activities::<AI>(instance);
507 self
508 }
509 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
511 where
512 AD: ActivityDefinition + ExecutableActivity,
513 AD::Output: Send + Sync,
514 {
515 self.activity_half
516 .activities
517 .register_activity::<AD>(instance);
518 self
519 }
520
521 pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
523 self.workflow_half
524 .workflow_definitions
525 .register_workflow::<WI>();
526 self
527 }
528
529 pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
533 where
534 W: WorkflowImplementation,
535 <W::Run as WorkflowDefinition>::Input: Send,
536 F: Fn() -> W + Send + Sync + 'static,
537 {
538 self.workflow_half
539 .workflow_definitions
540 .register_workflow_run_with_factory::<W, F>(factory);
541 self
542 }
543
544 pub async fn run(&mut self) -> Result<(), anyhow::Error> {
547 let shutdown_token = CancellationToken::new();
548 let (common, wf_half, act_half) = self.split_apart();
549 let (wf_future_tx, wf_future_rx) = unbounded_channel();
550 let (completions_tx, completions_rx) = unbounded_channel();
551
552 let workflow_local_set = tokio::task::LocalSet::new();
555
556 let wf_future_joiner = async {
557 UnboundedReceiverStream::new(wf_future_rx)
558 .map(Result::<_, anyhow::Error>::Ok)
559 .try_for_each_concurrent(
560 None,
561 |WorkflowFutureHandle {
562 join_handle,
563 run_id,
564 }| {
565 let wf_half = &*wf_half;
566 async move {
567 let result = join_handle.await?;
568 if let Err(e) = result
571 && !matches!(e, WorkflowTermination::Evicted)
572 {
573 return Err(e.into());
574 }
575 debug!(run_id=%run_id, "Removing workflow from cache");
576 wf_half.workflows.borrow_mut().remove(&run_id);
577 wf_half.workflow_removed_from_map.notify_one();
578 Ok(())
579 }
580 },
581 )
582 .await
583 .context("Workflow futures encountered an error")
584 };
585 let wf_completion_processor = async {
586 UnboundedReceiverStream::new(completions_rx)
587 .map(Ok)
588 .try_for_each_concurrent(None, |mut completion| async {
589 encode_payloads(
590 &mut completion,
591 common.data_converter.codec(),
592 &SerializationContextData::Workflow,
593 )
594 .await;
595 if let Some(ref i) = common.worker_interceptor {
596 i.on_workflow_activation_completion(&completion).await;
597 }
598 common.worker.complete_workflow_activation(completion).await
599 })
600 .map_err(anyhow::Error::from)
601 .await
602 .context("Workflow completions processor encountered an error")
603 };
604 tokio::try_join!(
605 async {
607 workflow_local_set.run_until(async {
608 tokio::try_join!(
609 async {
611 loop {
612 let mut activation =
613 match common.worker.poll_workflow_activation().await {
614 Err(PollError::ShutDown) => {
615 break;
616 }
617 o => o?,
618 };
619 decode_payloads(
620 &mut activation,
621 common.data_converter.codec(),
622 &SerializationContextData::Workflow,
623 )
624 .await;
625 if let Some(ref i) = common.worker_interceptor {
626 i.on_workflow_activation(&activation).await?;
627 }
628 if let Some(wf_fut) = wf_half
629 .workflow_activation_handler(
630 common,
631 shutdown_token.clone(),
632 activation,
633 &completions_tx,
634 )
635 .await?
636 && wf_future_tx.send(wf_fut).is_err()
637 {
638 panic!(
639 "Receive half of completion processor channel cannot be dropped"
640 );
641 }
642 }
643 shutdown_token.cancel();
645 drop(wf_future_tx);
648 drop(completions_tx);
649 Result::<_, anyhow::Error>::Ok(())
650 },
651 wf_future_joiner,
652 )
653 }).await
654 },
655 async {
658 if !act_half.activities.is_empty() {
659 loop {
660 let activity = common.worker.poll_activity_task().await;
661 if matches!(activity, Err(PollError::ShutDown)) {
662 break;
663 }
664 let mut activity = activity?;
665 decode_payloads(
666 &mut activity,
667 common.data_converter.codec(),
668 &SerializationContextData::Activity,
669 )
670 .await;
671 act_half.activity_task_handler(
672 common.worker.clone(),
673 common.task_queue.clone(),
674 common.data_converter.clone(),
675 activity,
676 )?;
677 }
678 };
679 Result::<_, anyhow::Error>::Ok(())
680 },
681 wf_completion_processor,
682 )?;
683
684 if let Some(i) = self.common.worker_interceptor.as_ref() {
685 i.on_shutdown(self);
686 }
687 self.common.worker.shutdown().await;
688 Ok(())
689 }
690
691 pub fn set_worker_interceptor(&mut self, interceptor: impl WorkerInterceptor + 'static) {
693 self.common.worker_interceptor = Some(Box::new(interceptor));
694 }
695
696 pub fn with_new_core_worker(&mut self, new_core_worker: Arc<CoreWorker>) {
700 self.common.worker = new_core_worker;
701 }
702
703 pub fn cached_workflows(&self) -> usize {
706 self.workflow_half.workflows.borrow().len()
707 }
708
709 pub fn worker_instance_key(&self) -> Uuid {
711 self.common.worker.worker_instance_key()
712 }
713
714 #[doc(hidden)]
715 pub fn core_worker(&self) -> Arc<CoreWorker> {
716 self.common.worker.clone()
717 }
718
719 fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) {
720 (
721 &mut self.common,
722 &mut self.workflow_half,
723 &mut self.activity_half,
724 )
725 }
726}
727
728impl WorkflowHalf {
729 #[allow(clippy::type_complexity)]
730 async fn workflow_activation_handler(
731 &self,
732 common: &CommonWorker,
733 shutdown_token: CancellationToken,
734 mut activation: WorkflowActivation,
735 completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
736 ) -> Result<
737 Option<
738 WorkflowFutureHandle<
739 impl Future<Output = Result<WorkflowResult<Payload>, JoinError>> + use<>,
740 >,
741 >,
742 anyhow::Error,
743 > {
744 let mut res = None;
745 let run_id = activation.run_id.clone();
746
747 if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
750 Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
751 _ => None,
752 }) {
753 let workflow_type = sw.workflow_type.clone();
754 let payload_converter = common.data_converter.payload_converter().clone();
755 let (wff, activations) = {
756 if let Some(factory) = self.workflow_definitions.get_workflow(&workflow_type) {
757 match WorkflowFunction::from_invocation(factory).start_workflow(
758 common.worker.get_config().namespace.clone(),
759 common.task_queue.clone(),
760 run_id.clone(),
761 std::mem::take(sw),
762 completions_tx.clone(),
763 payload_converter,
764 ) {
765 Ok(result) => result,
766 Err(e) => {
767 warn!("Failed to create workflow {workflow_type}: {e}");
768 completions_tx
769 .send(WorkflowActivationCompletion::fail(
770 run_id,
771 format!("Failed to create workflow: {e}").into(),
772 Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
773 ))
774 .expect("Completion channel intact");
775 return Ok(None);
776 }
777 }
778 } else {
779 warn!("Workflow type {workflow_type} not found");
780 completions_tx
781 .send(WorkflowActivationCompletion::fail(
782 run_id,
783 format!("Workflow type {workflow_type} not found").into(),
784 Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
785 ))
786 .expect("Completion channel intact");
787 return Ok(None);
788 }
789 };
790 let wff = tokio::task::unconstrained(wff);
793 let jh = tokio::task::spawn_local(async move {
795 tokio::select! {
796 r = wff.fuse() => r,
797 _ = shutdown_token.cancelled() => {
800 Err(WorkflowTermination::Evicted)
801 }
802 }
803 });
804 res = Some(WorkflowFutureHandle {
805 join_handle: jh,
806 run_id: run_id.clone(),
807 });
808 loop {
809 if self.workflows.borrow_mut().contains_key(&run_id) {
814 self.workflow_removed_from_map.notified().await;
815 } else {
816 break;
817 }
818 }
819 self.workflows.borrow_mut().insert(
820 run_id.clone(),
821 WorkflowData {
822 activation_chan: activations,
823 },
824 );
825 }
826
827 if let Some(dat) = self.workflows.borrow_mut().get_mut(&run_id) {
830 dat.activation_chan
831 .send(activation)
832 .expect("Workflow should exist if we're sending it an activation");
833 } else {
834 if activation.jobs.len() == 1
840 && matches!(
841 activation.jobs.first().map(|j| &j.variant),
842 Some(Some(Variant::RemoveFromCache(_)))
843 )
844 {
845 completions_tx
846 .send(WorkflowActivationCompletion::from_cmds(run_id, vec![]))
847 .expect("Completion channel intact");
848 return Ok(None);
849 }
850
851 bail!("Got activation {activation:?} for unknown workflow {run_id}");
854 };
855
856 Ok(res)
857 }
858}
859
860impl ActivityHalf {
861 fn activity_task_handler(
863 &mut self,
864 worker: Arc<CoreWorker>,
865 task_queue: String,
866 data_converter: DataConverter,
867 activity: ActivityTask,
868 ) -> Result<(), anyhow::Error> {
869 match activity.variant {
870 Some(activity_task::Variant::Start(start)) => {
871 let act_fn = self.activities.get(&start.activity_type).ok_or_else(|| {
872 anyhow!(
873 "No function registered for activity type {}",
874 start.activity_type
875 )
876 })?;
877 let span = info_span!(
878 "RunActivity",
879 "otel.name" = format!("RunActivity:{}", start.activity_type),
880 "otel.kind" = "server",
881 "temporalActivityID" = start.activity_id,
882 "temporalWorkflowID" = field::Empty,
883 "temporalRunID" = field::Empty,
884 );
885 let ct = CancellationToken::new();
886 let task_token = activity.task_token;
887 self.task_tokens_to_cancels
888 .insert(task_token.clone().into(), ct.clone());
889
890 let (ctx, args) =
891 ActivityContext::new(worker.clone(), ct, task_queue, task_token.clone(), start);
892 let codec_data_converter = data_converter.clone();
893
894 tokio::spawn(async move {
895 let act_fut = async move {
896 if let Some(info) = &ctx.info().workflow_execution {
897 Span::current()
898 .record("temporalWorkflowID", &info.workflow_id)
899 .record("temporalRunID", &info.run_id);
900 }
901 (act_fn)(args, data_converter, ctx).await
902 }
903 .instrument(span);
904 let output = AssertUnwindSafe(act_fut).catch_unwind().await;
905 let result = match output {
906 Err(e) => ActivityExecutionResult::fail(Failure::application_failure(
907 format!("Activity function panicked: {}", panic_formatter(e)),
908 true,
909 )),
910 Ok(Ok(p)) => ActivityExecutionResult::ok(p),
911 Ok(Err(err)) => match err {
912 ActivityError::Retryable {
913 source,
914 explicit_delay,
915 } => ActivityExecutionResult::fail({
916 let mut f = Failure::application_failure_from_error(
917 anyhow::Error::from_boxed(source),
918 false,
919 );
920 if let Some(d) = explicit_delay
921 && let Some(failure::FailureInfo::ApplicationFailureInfo(fi)) =
922 f.failure_info.as_mut()
923 {
924 fi.next_retry_delay = d.try_into().ok();
925 }
926 f
927 }),
928 ActivityError::Cancelled { details } => {
929 ActivityExecutionResult::cancel_from_details(details)
930 }
931 ActivityError::NonRetryable(nre) => ActivityExecutionResult::fail(
932 Failure::application_failure_from_error(
933 anyhow::Error::from_boxed(nre),
934 true,
935 ),
936 ),
937 ActivityError::WillCompleteAsync => {
938 ActivityExecutionResult::will_complete_async()
939 }
940 },
941 };
942 let mut completion = ActivityTaskCompletion {
943 task_token,
944 result: Some(result),
945 };
946 encode_payloads(
947 &mut completion,
948 codec_data_converter.codec(),
949 &SerializationContextData::Activity,
950 )
951 .await;
952 worker.complete_activity_task(completion).await?;
953 Ok::<_, anyhow::Error>(())
954 });
955 }
956 Some(activity_task::Variant::Cancel(_)) => {
957 if let Some(ct) = self
958 .task_tokens_to_cancels
959 .get(activity.task_token.as_slice())
960 {
961 ct.cancel();
962 }
963 }
964 None => bail!("Undefined activity task variant"),
965 }
966 Ok(())
967 }
968}
969
970#[derive(Debug)]
971enum UnblockEvent {
972 Timer(u32, TimerResult),
973 Activity(u32, Box<ActivityResolution>),
974 WorkflowStart(u32, Box<ChildWorkflowStartStatus>),
975 WorkflowComplete(u32, Box<ChildWorkflowResult>),
976 SignalExternal(u32, Option<Failure>),
977 CancelExternal(u32, Option<Failure>),
978 NexusOperationStart(u32, Box<resolve_nexus_operation_start::Status>),
979 NexusOperationComplete(u32, Box<NexusOperationResult>),
980}
981
982#[derive(Debug, Copy, Clone, PartialEq, Eq)]
984pub enum TimerResult {
985 Cancelled,
987 Fired,
989}
990
991#[derive(Debug, Clone, Copy, PartialEq, Eq)]
993pub struct SignalExternalOk;
994pub type SignalExternalWfResult = Result<SignalExternalOk, Failure>;
996
997#[derive(Debug, Clone, Copy, PartialEq, Eq)]
999pub struct CancelExternalOk;
1000pub type CancelExternalWfResult = Result<CancelExternalOk, Failure>;
1002
1003trait Unblockable {
1004 type OtherDat;
1005
1006 fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self;
1007}
1008
1009impl Unblockable for TimerResult {
1010 type OtherDat = ();
1011 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1012 match ue {
1013 UnblockEvent::Timer(_, result) => result,
1014 _ => panic!("Invalid unblock event for timer"),
1015 }
1016 }
1017}
1018
1019impl Unblockable for ActivityResolution {
1020 type OtherDat = ();
1021 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1022 match ue {
1023 UnblockEvent::Activity(_, result) => *result,
1024 _ => panic!("Invalid unblock event for activity"),
1025 }
1026 }
1027}
1028
1029impl Unblockable for PendingChildWorkflow {
1030 type OtherDat = ChildWfCommon;
1032 fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1033 match ue {
1034 UnblockEvent::WorkflowStart(_, result) => Self {
1035 status: *result,
1036 common: od,
1037 },
1038 _ => panic!("Invalid unblock event for child workflow start"),
1039 }
1040 }
1041}
1042
1043impl Unblockable for ChildWorkflowResult {
1044 type OtherDat = ();
1045 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1046 match ue {
1047 UnblockEvent::WorkflowComplete(_, result) => *result,
1048 _ => panic!("Invalid unblock event for child workflow complete"),
1049 }
1050 }
1051}
1052
1053impl Unblockable for SignalExternalWfResult {
1054 type OtherDat = ();
1055 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1056 match ue {
1057 UnblockEvent::SignalExternal(_, maybefail) => {
1058 maybefail.map_or(Ok(SignalExternalOk), Err)
1059 }
1060 _ => panic!("Invalid unblock event for signal external workflow result"),
1061 }
1062 }
1063}
1064
1065impl Unblockable for CancelExternalWfResult {
1066 type OtherDat = ();
1067 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1068 match ue {
1069 UnblockEvent::CancelExternal(_, maybefail) => {
1070 maybefail.map_or(Ok(CancelExternalOk), Err)
1071 }
1072 _ => panic!("Invalid unblock event for signal external workflow result"),
1073 }
1074 }
1075}
1076
1077type NexusStartResult = Result<StartedNexusOperation, Failure>;
1078impl Unblockable for NexusStartResult {
1079 type OtherDat = NexusUnblockData;
1080 fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1081 match ue {
1082 UnblockEvent::NexusOperationStart(_, result) => match *result {
1083 resolve_nexus_operation_start::Status::OperationToken(op_token) => {
1084 Ok(StartedNexusOperation {
1085 operation_token: Some(op_token),
1086 unblock_dat: od,
1087 })
1088 }
1089 resolve_nexus_operation_start::Status::StartedSync(_) => {
1090 Ok(StartedNexusOperation {
1091 operation_token: None,
1092 unblock_dat: od,
1093 })
1094 }
1095 resolve_nexus_operation_start::Status::Failed(f) => Err(f),
1096 },
1097 _ => panic!("Invalid unblock event for nexus operation"),
1098 }
1099 }
1100}
1101
1102impl Unblockable for NexusOperationResult {
1103 type OtherDat = ();
1104
1105 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1106 match ue {
1107 UnblockEvent::NexusOperationComplete(_, result) => *result,
1108 _ => panic!("Invalid unblock event for nexus operation complete"),
1109 }
1110 }
1111}
1112
1113#[derive(Debug, Clone)]
1115pub(crate) enum CancellableID {
1116 Timer(u32),
1117 Activity(u32),
1118 LocalActivity(u32),
1119 ChildWorkflow {
1120 seqnum: u32,
1121 reason: String,
1122 },
1123 SignalExternalWorkflow(u32),
1124 ExternalWorkflow {
1125 seqnum: u32,
1126 execution: NamespacedWorkflowExecution,
1127 reason: String,
1128 },
1129 NexusOp(u32),
1131}
1132
1133pub(crate) trait SupportsCancelReason {
1135 fn with_reason(self, reason: String) -> CancellableID;
1137}
1138#[derive(Debug, Clone)]
1139pub(crate) enum CancellableIDWithReason {
1140 ChildWorkflow {
1141 seqnum: u32,
1142 },
1143 ExternalWorkflow {
1144 seqnum: u32,
1145 execution: NamespacedWorkflowExecution,
1146 },
1147}
1148impl CancellableIDWithReason {
1149 pub(crate) fn seq_num(&self) -> u32 {
1150 match self {
1151 CancellableIDWithReason::ChildWorkflow { seqnum } => *seqnum,
1152 CancellableIDWithReason::ExternalWorkflow { seqnum, .. } => *seqnum,
1153 }
1154 }
1155}
1156impl SupportsCancelReason for CancellableIDWithReason {
1157 fn with_reason(self, reason: String) -> CancellableID {
1158 match self {
1159 CancellableIDWithReason::ChildWorkflow { seqnum } => {
1160 CancellableID::ChildWorkflow { seqnum, reason }
1161 }
1162 CancellableIDWithReason::ExternalWorkflow { seqnum, execution } => {
1163 CancellableID::ExternalWorkflow {
1164 seqnum,
1165 execution,
1166 reason,
1167 }
1168 }
1169 }
1170 }
1171}
1172impl From<CancellableIDWithReason> for CancellableID {
1173 fn from(v: CancellableIDWithReason) -> Self {
1174 v.with_reason("".to_string())
1175 }
1176}
1177
1178#[derive(derive_more::From)]
1179#[allow(clippy::large_enum_variant)]
1180enum RustWfCmd {
1181 #[from(ignore)]
1182 Cancel(CancellableID),
1183 ForceWFTFailure(anyhow::Error),
1184 NewCmd(CommandCreateRequest),
1185 NewNonblockingCmd(workflow_command::Variant),
1186 SubscribeChildWorkflowCompletion(CommandSubscribeChildWorkflowCompletion),
1187 SubscribeNexusOperationCompletion {
1188 seq: u32,
1189 unblocker: oneshot::Sender<UnblockEvent>,
1190 },
1191}
1192
1193struct CommandCreateRequest {
1194 cmd: WorkflowCommand,
1195 unblocker: oneshot::Sender<UnblockEvent>,
1196}
1197
1198struct CommandSubscribeChildWorkflowCompletion {
1199 seq: u32,
1200 unblocker: oneshot::Sender<UnblockEvent>,
1201}
1202
1203pub type WorkflowResult<T> = Result<T, WorkflowTermination>;
1208
1209#[derive(Debug, thiserror::Error)]
1214pub enum WorkflowTermination {
1215 #[error("Workflow cancelled")]
1217 Cancelled,
1218
1219 #[error("Workflow evicted from cache")]
1221 Evicted,
1222
1223 #[error("Continue as new")]
1225 ContinueAsNew(Box<ContinueAsNewWorkflowExecution>),
1226
1227 #[error("Workflow failed: {0}")]
1229 Failed(#[source] anyhow::Error),
1230}
1231
1232impl WorkflowTermination {
1233 pub fn continue_as_new(can: ContinueAsNewWorkflowExecution) -> Self {
1235 Self::ContinueAsNew(Box::new(can))
1236 }
1237
1238 pub fn failed(err: impl Into<anyhow::Error>) -> Self {
1240 Self::Failed(err.into())
1241 }
1242}
1243
1244impl From<anyhow::Error> for WorkflowTermination {
1245 fn from(err: anyhow::Error) -> Self {
1246 Self::Failed(err)
1247 }
1248}
1249
1250impl From<ActivityExecutionError> for WorkflowTermination {
1251 fn from(value: ActivityExecutionError) -> Self {
1252 Self::failed(value)
1253 }
1254}
1255
1256#[derive(Debug)]
1258pub enum ActExitValue<T> {
1259 WillCompleteAsync,
1261 Normal(T),
1263}
1264
1265impl<T: AsJsonPayloadExt> From<T> for ActExitValue<T> {
1266 fn from(t: T) -> Self {
1267 Self::Normal(t)
1268 }
1269}
1270
1271fn panic_formatter(panic: Box<dyn Any>) -> Box<dyn Display> {
1273 _panic_formatter::<&str>(panic)
1274}
1275fn _panic_formatter<T: 'static + PrintablePanicType>(panic: Box<dyn Any>) -> Box<dyn Display> {
1276 match panic.downcast::<T>() {
1277 Ok(d) => d,
1278 Err(orig) => {
1279 if TypeId::of::<<T as PrintablePanicType>::NextType>()
1280 == TypeId::of::<EndPrintingAttempts>()
1281 {
1282 return Box::new("Couldn't turn panic into a string");
1283 }
1284 _panic_formatter::<T::NextType>(orig)
1285 }
1286 }
1287}
1288trait PrintablePanicType: Display {
1289 type NextType: PrintablePanicType;
1290}
1291impl PrintablePanicType for &str {
1292 type NextType = String;
1293}
1294impl PrintablePanicType for String {
1295 type NextType = EndPrintingAttempts;
1296}
1297struct EndPrintingAttempts {}
1298impl Display for EndPrintingAttempts {
1299 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1300 write!(f, "Will never be printed")
1301 }
1302}
1303impl PrintablePanicType for EndPrintingAttempts {
1304 type NextType = EndPrintingAttempts;
1305}
1306
1307#[cfg(test)]
1308mod tests {
1309 use super::*;
1310 use temporalio_macros::{activities, workflow, workflow_methods};
1311
1312 struct MyActivities {}
1313
1314 #[activities]
1315 impl MyActivities {
1316 #[activity]
1317 async fn my_activity(_ctx: ActivityContext) -> Result<(), ActivityError> {
1318 Ok(())
1319 }
1320
1321 #[activity]
1322 async fn takes_self(
1323 self: Arc<Self>,
1324 _ctx: ActivityContext,
1325 _: String,
1326 ) -> Result<(), ActivityError> {
1327 Ok(())
1328 }
1329 }
1330
1331 #[test]
1332 fn test_activity_registration() {
1333 let act_instance = MyActivities {};
1334 let _ = WorkerOptions::new("task_q").register_activities(act_instance);
1335 }
1336
1337 #[allow(unused, clippy::diverging_sub_expression)]
1339 fn test_activity_via_workflow_context() {
1340 let wf_ctx: WorkflowContext<MyWorkflow> = unimplemented!();
1341 wf_ctx.start_activity(MyActivities::my_activity, (), ActivityOptions::default());
1342 wf_ctx.start_activity(
1343 MyActivities::takes_self,
1344 "Hi".to_owned(),
1345 ActivityOptions::default(),
1346 );
1347 }
1348
1349 #[allow(dead_code, unreachable_code, unused, clippy::diverging_sub_expression)]
1351 async fn test_activity_direct_invocation() {
1352 let ctx: ActivityContext = unimplemented!();
1353 let _result = MyActivities::my_activity.run(ctx).await;
1354 }
1355
1356 #[workflow]
1357 struct MyWorkflow {
1358 counter: u32,
1359 }
1360
1361 #[allow(dead_code)]
1362 #[workflow_methods]
1363 impl MyWorkflow {
1364 #[init]
1365 fn new(_ctx: &WorkflowContextView, _input: String) -> Self {
1366 Self { counter: 0 }
1367 }
1368
1369 #[run]
1370 async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
1371 Ok(format!("Counter: {}", ctx.state(|s| s.counter)))
1372 }
1373
1374 #[signal(name = "increment")]
1375 fn increment_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
1376 self.counter += amount;
1377 }
1378
1379 #[signal]
1380 async fn async_signal(_ctx: &mut WorkflowContext<Self>) {}
1381
1382 #[query]
1383 fn get_counter(&self, _ctx: &WorkflowContextView) -> u32 {
1384 self.counter
1385 }
1386
1387 #[update(name = "double")]
1388 fn double_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>) -> u32 {
1389 self.counter *= 2;
1390 self.counter
1391 }
1392
1393 #[update]
1394 async fn async_update(_ctx: &mut WorkflowContext<Self>, val: i32) -> i32 {
1395 val * 2
1396 }
1397 }
1398
1399 #[test]
1400 fn test_workflow_registration() {
1401 let _ = WorkerOptions::new("task_q").register_workflow::<MyWorkflow>();
1402 }
1403}