Skip to main content

temporalio_sdk/
interceptors.rs

1//! User-definable interceptors are defined in this module
2
3use crate::{
4    Worker,
5    activities::{ActivityContext, ActivityError, ActivityInfo},
6};
7use anyhow::bail;
8use futures_util::future::BoxFuture;
9use std::{
10    any::Any,
11    collections::HashMap,
12    sync::{Arc, OnceLock},
13};
14use temporalio_common::{
15    data_converters::{
16        GenericPayloadConverter, PayloadConversionError, SerializationContext, TemporalSerializable,
17    },
18    protos::{
19        coresdk::{
20            workflow_activation::{WorkflowActivation, remove_from_cache::EvictionReason},
21            workflow_completion::WorkflowActivationCompletion,
22        },
23        temporal::api::common::v1::Payload,
24    },
25};
26
27mod activity_execution_value {
28    use super::*;
29
30    pub trait Sealed {
31        fn to_activity_payload(
32            &self,
33            context: &SerializationContext<'_>,
34        ) -> Result<Payload, PayloadConversionError>;
35    }
36
37    impl<T> Sealed for T
38    where
39        T: Any + TemporalSerializable + Send + Sync,
40    {
41        fn to_activity_payload(
42            &self,
43            context: &SerializationContext<'_>,
44        ) -> Result<Payload, PayloadConversionError> {
45            context.converter.to_payload(context, self)
46        }
47    }
48}
49
50/// Implementors can intercept certain actions that happen within the Worker.
51///
52/// Advanced usage only.
53#[async_trait::async_trait(?Send)]
54pub trait WorkerInterceptor {
55    /// Called every time a workflow activation completes (just before sending the completion to
56    /// core).
57    async fn on_workflow_activation_completion(&self, _completion: &WorkflowActivationCompletion) {}
58    /// Called after the worker has initiated shutdown and the workflow/activity polling loops
59    /// have exited, but just before waiting for the inner core worker shutdown
60    fn on_shutdown(&self, _sdk_worker: &Worker) {}
61    /// Called every time a workflow is about to be activated
62    async fn on_workflow_activation(
63        &self,
64        _activation: &WorkflowActivation,
65    ) -> Result<(), anyhow::Error> {
66        Ok(())
67    }
68}
69
70/// Continuation for an interceptor operation.
71///
72/// Interceptor implementations call [`Next::run`] to invoke the next step of the chain.
73pub struct Next<'a, I, O> {
74    inner: Box<dyn FnOnce(I) -> O + Send + 'a>,
75}
76
77impl<'a, I, O> Next<'a, I, O> {
78    pub(crate) fn new(f: impl FnOnce(I) -> O + Send + 'a) -> Self {
79        Self { inner: Box::new(f) }
80    }
81
82    /// Continue the call chain with the provided input.
83    pub fn run(self, input: I) -> O {
84        (self.inner)(input)
85    }
86}
87
88/// Activity execution data passed to [`ActivityInboundInterceptor::execute_activity`].
89#[non_exhaustive]
90pub struct ExecuteActivityInput {
91    context: ActivityContext,
92    args: Box<dyn Any + Send + Sync>,
93}
94
95impl ExecuteActivityInput {
96    pub(crate) fn new(context: ActivityContext, args: Box<dyn Any + Send + Sync>) -> Self {
97        Self { context, args }
98    }
99
100    pub(crate) fn into_parts(self) -> (ActivityContext, Box<dyn Any + Send + Sync>) {
101        (self.context, self.args)
102    }
103
104    /// Information about the activity execution.
105    pub fn activity_info(&self) -> &ActivityInfo {
106        self.context.info()
107    }
108
109    /// Headers attached to this activity.
110    pub fn headers(&self) -> &HashMap<String, Payload> {
111        self.context.headers()
112    }
113
114    /// Mutably access headers attached to this activity.
115    pub fn headers_mut(&mut self) -> &mut HashMap<String, Payload> {
116        self.context.headers_mut()
117    }
118
119    /// Attempt to access the decoded activity arguments as a concrete type.
120    pub fn args_ref<T: Any>(&self) -> Option<&T> {
121        self.args.downcast_ref()
122    }
123
124    /// Attempt to mutably access the decoded activity arguments as a concrete type.
125    pub fn args_mut<T: Any>(&mut self) -> Option<&mut T> {
126        self.args.downcast_mut()
127    }
128}
129
130/// Type-erased activity output carried through the activity interceptor chain.
131pub trait ActivityExecutionValue:
132    Any + TemporalSerializable + Send + Sync + activity_execution_value::Sealed
133{
134    /// Access this value as [`Any`] for type-specific inspection.
135    fn as_any(&self) -> &dyn Any;
136}
137
138impl<T> ActivityExecutionValue for T
139where
140    T: Any + TemporalSerializable + Send + Sync,
141{
142    fn as_any(&self) -> &dyn Any {
143        self
144    }
145}
146
147impl dyn ActivityExecutionValue {
148    /// Attempt to access the activity output as a concrete type.
149    pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
150        self.as_any().downcast_ref()
151    }
152
153    pub(crate) fn serialize_payload(
154        &self,
155        context: &SerializationContext<'_>,
156    ) -> Result<Payload, PayloadConversionError> {
157        self.to_activity_payload(context)
158    }
159}
160
161/// Result of an activity execution carried through the interceptor chain.
162pub type ExecuteActivityResult = Result<Box<dyn ActivityExecutionValue>, ActivityError>;
163
164/// Future produced by activity inbound interceptors.
165pub type ExecuteActivityOutput<'a> = BoxFuture<'a, ExecuteActivityResult>;
166
167/// Inbound interceptor for activity calls coming from the server.
168///
169/// Must be implemented by inbound activity interceptors.
170pub trait ActivityInboundInterceptor: Send + Sync + 'static {
171    /// Called to invoke the activity.
172    fn execute_activity<'a>(
173        &'a self,
174        input: ExecuteActivityInput,
175        next: Next<'a, ExecuteActivityInput, ExecuteActivityOutput<'a>>,
176    ) -> ExecuteActivityOutput<'a> {
177        next.run(input)
178    }
179}
180
181/// Supports the composition of interceptors
182pub struct InterceptorWithNext {
183    inner: Box<dyn WorkerInterceptor>,
184    next: Option<Box<InterceptorWithNext>>,
185}
186
187impl InterceptorWithNext {
188    /// Create from an existing interceptor, can be used to initialize a chain of interceptors
189    pub fn new(inner: Box<dyn WorkerInterceptor>) -> Self {
190        Self { inner, next: None }
191    }
192
193    /// Sets the next interceptor, and then returns that interceptor, wrapped by
194    /// [InterceptorWithNext]. You can keep calling this method on it to extend the chain.
195    pub fn set_next(&mut self, next: Box<dyn WorkerInterceptor>) -> &mut InterceptorWithNext {
196        self.next.insert(Box::new(Self::new(next)))
197    }
198}
199
200#[async_trait::async_trait(?Send)]
201impl WorkerInterceptor for InterceptorWithNext {
202    async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
203        self.inner.on_workflow_activation_completion(c).await;
204        if let Some(next) = &self.next {
205            next.on_workflow_activation_completion(c).await;
206        }
207    }
208
209    fn on_shutdown(&self, w: &Worker) {
210        self.inner.on_shutdown(w);
211        if let Some(next) = &self.next {
212            next.on_shutdown(w);
213        }
214    }
215
216    async fn on_workflow_activation(&self, a: &WorkflowActivation) -> Result<(), anyhow::Error> {
217        self.inner.on_workflow_activation(a).await?;
218        if let Some(next) = &self.next {
219            next.on_workflow_activation(a).await?;
220        }
221        Ok(())
222    }
223}
224
225/// An interceptor which causes the worker's run function to exit early if nondeterminism errors are
226/// encountered
227pub struct FailOnNondeterminismInterceptor {}
228#[async_trait::async_trait(?Send)]
229impl WorkerInterceptor for FailOnNondeterminismInterceptor {
230    async fn on_workflow_activation(
231        &self,
232        activation: &WorkflowActivation,
233    ) -> Result<(), anyhow::Error> {
234        if matches!(
235            activation.eviction_reason(),
236            Some(EvictionReason::Nondeterminism)
237        ) {
238            bail!("Workflow is being evicted because of nondeterminism! {activation}");
239        }
240        Ok(())
241    }
242}
243
244/// An interceptor that allows you to fetch the exit value of the workflow if and when it is set
245#[derive(Default)]
246pub struct ReturnWorkflowExitValueInterceptor {
247    result_value: Arc<OnceLock<Payload>>,
248}
249
250impl ReturnWorkflowExitValueInterceptor {
251    /// Can be used to fetch the workflow result if/when it is determined
252    pub fn result_handle(&self) -> Arc<OnceLock<Payload>> {
253        self.result_value.clone()
254    }
255}
256
257#[async_trait::async_trait(?Send)]
258impl WorkerInterceptor for ReturnWorkflowExitValueInterceptor {
259    async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
260        if let Some(v) = c.complete_workflow_execution_value() {
261            let _ = self.result_value.set(v.clone());
262        }
263    }
264}