temporalio_sdk/
interceptors.rs1use crate::{
4 Worker,
5 activities::{ActivityContext, ActivityError, ActivityInfo},
6};
7use anyhow::bail;
8use futures_util::future::BoxFuture;
9use std::{
10 any::Any,
11 collections::HashMap,
12 sync::{Arc, OnceLock},
13};
14use temporalio_common::{
15 data_converters::{
16 GenericPayloadConverter, PayloadConversionError, SerializationContext, TemporalSerializable,
17 },
18 protos::{
19 coresdk::{
20 workflow_activation::{WorkflowActivation, remove_from_cache::EvictionReason},
21 workflow_completion::WorkflowActivationCompletion,
22 },
23 temporal::api::common::v1::Payload,
24 },
25};
26
27mod activity_execution_value {
28 use super::*;
29
30 pub trait Sealed {
31 fn to_activity_payload(
32 &self,
33 context: &SerializationContext<'_>,
34 ) -> Result<Payload, PayloadConversionError>;
35 }
36
37 impl<T> Sealed for T
38 where
39 T: Any + TemporalSerializable + Send + Sync,
40 {
41 fn to_activity_payload(
42 &self,
43 context: &SerializationContext<'_>,
44 ) -> Result<Payload, PayloadConversionError> {
45 context.converter.to_payload(context, self)
46 }
47 }
48}
49
50#[async_trait::async_trait(?Send)]
54pub trait WorkerInterceptor {
55 async fn on_workflow_activation_completion(&self, _completion: &WorkflowActivationCompletion) {}
58 fn on_shutdown(&self, _sdk_worker: &Worker) {}
61 async fn on_workflow_activation(
63 &self,
64 _activation: &WorkflowActivation,
65 ) -> Result<(), anyhow::Error> {
66 Ok(())
67 }
68}
69
70pub struct Next<'a, I, O> {
74 inner: Box<dyn FnOnce(I) -> O + Send + 'a>,
75}
76
77impl<'a, I, O> Next<'a, I, O> {
78 pub(crate) fn new(f: impl FnOnce(I) -> O + Send + 'a) -> Self {
79 Self { inner: Box::new(f) }
80 }
81
82 pub fn run(self, input: I) -> O {
84 (self.inner)(input)
85 }
86}
87
88#[non_exhaustive]
90pub struct ExecuteActivityInput {
91 context: ActivityContext,
92 args: Box<dyn Any + Send + Sync>,
93}
94
95impl ExecuteActivityInput {
96 pub(crate) fn new(context: ActivityContext, args: Box<dyn Any + Send + Sync>) -> Self {
97 Self { context, args }
98 }
99
100 pub(crate) fn into_parts(self) -> (ActivityContext, Box<dyn Any + Send + Sync>) {
101 (self.context, self.args)
102 }
103
104 pub fn activity_info(&self) -> &ActivityInfo {
106 self.context.info()
107 }
108
109 pub fn headers(&self) -> &HashMap<String, Payload> {
111 self.context.headers()
112 }
113
114 pub fn headers_mut(&mut self) -> &mut HashMap<String, Payload> {
116 self.context.headers_mut()
117 }
118
119 pub fn args_ref<T: Any>(&self) -> Option<&T> {
121 self.args.downcast_ref()
122 }
123
124 pub fn args_mut<T: Any>(&mut self) -> Option<&mut T> {
126 self.args.downcast_mut()
127 }
128}
129
130pub trait ActivityExecutionValue:
132 Any + TemporalSerializable + Send + Sync + activity_execution_value::Sealed
133{
134 fn as_any(&self) -> &dyn Any;
136}
137
138impl<T> ActivityExecutionValue for T
139where
140 T: Any + TemporalSerializable + Send + Sync,
141{
142 fn as_any(&self) -> &dyn Any {
143 self
144 }
145}
146
147impl dyn ActivityExecutionValue {
148 pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
150 self.as_any().downcast_ref()
151 }
152
153 pub(crate) fn serialize_payload(
154 &self,
155 context: &SerializationContext<'_>,
156 ) -> Result<Payload, PayloadConversionError> {
157 self.to_activity_payload(context)
158 }
159}
160
161pub type ExecuteActivityResult = Result<Box<dyn ActivityExecutionValue>, ActivityError>;
163
164pub type ExecuteActivityOutput<'a> = BoxFuture<'a, ExecuteActivityResult>;
166
167pub trait ActivityInboundInterceptor: Send + Sync + 'static {
171 fn execute_activity<'a>(
173 &'a self,
174 input: ExecuteActivityInput,
175 next: Next<'a, ExecuteActivityInput, ExecuteActivityOutput<'a>>,
176 ) -> ExecuteActivityOutput<'a> {
177 next.run(input)
178 }
179}
180
181pub struct InterceptorWithNext {
183 inner: Box<dyn WorkerInterceptor>,
184 next: Option<Box<InterceptorWithNext>>,
185}
186
187impl InterceptorWithNext {
188 pub fn new(inner: Box<dyn WorkerInterceptor>) -> Self {
190 Self { inner, next: None }
191 }
192
193 pub fn set_next(&mut self, next: Box<dyn WorkerInterceptor>) -> &mut InterceptorWithNext {
196 self.next.insert(Box::new(Self::new(next)))
197 }
198}
199
200#[async_trait::async_trait(?Send)]
201impl WorkerInterceptor for InterceptorWithNext {
202 async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
203 self.inner.on_workflow_activation_completion(c).await;
204 if let Some(next) = &self.next {
205 next.on_workflow_activation_completion(c).await;
206 }
207 }
208
209 fn on_shutdown(&self, w: &Worker) {
210 self.inner.on_shutdown(w);
211 if let Some(next) = &self.next {
212 next.on_shutdown(w);
213 }
214 }
215
216 async fn on_workflow_activation(&self, a: &WorkflowActivation) -> Result<(), anyhow::Error> {
217 self.inner.on_workflow_activation(a).await?;
218 if let Some(next) = &self.next {
219 next.on_workflow_activation(a).await?;
220 }
221 Ok(())
222 }
223}
224
225pub struct FailOnNondeterminismInterceptor {}
228#[async_trait::async_trait(?Send)]
229impl WorkerInterceptor for FailOnNondeterminismInterceptor {
230 async fn on_workflow_activation(
231 &self,
232 activation: &WorkflowActivation,
233 ) -> Result<(), anyhow::Error> {
234 if matches!(
235 activation.eviction_reason(),
236 Some(EvictionReason::Nondeterminism)
237 ) {
238 bail!("Workflow is being evicted because of nondeterminism! {activation}");
239 }
240 Ok(())
241 }
242}
243
244#[derive(Default)]
246pub struct ReturnWorkflowExitValueInterceptor {
247 result_value: Arc<OnceLock<Payload>>,
248}
249
250impl ReturnWorkflowExitValueInterceptor {
251 pub fn result_handle(&self) -> Arc<OnceLock<Payload>> {
253 self.result_value.clone()
254 }
255}
256
257#[async_trait::async_trait(?Send)]
258impl WorkerInterceptor for ReturnWorkflowExitValueInterceptor {
259 async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
260 if let Some(v) = c.complete_workflow_execution_value() {
261 let _ = self.result_value.set(v.clone());
262 }
263 }
264}