Skip to main content

temporalio_sdk/
interceptors.rs

1//! User-definable interceptors are defined in this module
2
3use crate::Worker;
4use anyhow::bail;
5use std::sync::{Arc, OnceLock};
6use temporalio_common::protos::{
7    coresdk::{
8        workflow_activation::{WorkflowActivation, remove_from_cache::EvictionReason},
9        workflow_completion::WorkflowActivationCompletion,
10    },
11    temporal::api::common::v1::Payload,
12};
13
14/// Implementors can intercept certain actions that happen within the Worker.
15///
16/// Advanced usage only.
17#[async_trait::async_trait(?Send)]
18pub trait WorkerInterceptor {
19    /// Called every time a workflow activation completes (just before sending the completion to
20    /// core).
21    async fn on_workflow_activation_completion(&self, _completion: &WorkflowActivationCompletion) {}
22    /// Called after the worker has initiated shutdown and the workflow/activity polling loops
23    /// have exited, but just before waiting for the inner core worker shutdown
24    fn on_shutdown(&self, _sdk_worker: &Worker) {}
25    /// Called every time a workflow is about to be activated
26    async fn on_workflow_activation(
27        &self,
28        _activation: &WorkflowActivation,
29    ) -> Result<(), anyhow::Error> {
30        Ok(())
31    }
32}
33
34/// Supports the composition of interceptors
35pub struct InterceptorWithNext {
36    inner: Box<dyn WorkerInterceptor>,
37    next: Option<Box<InterceptorWithNext>>,
38}
39
40impl InterceptorWithNext {
41    /// Create from an existing interceptor, can be used to initialize a chain of interceptors
42    pub fn new(inner: Box<dyn WorkerInterceptor>) -> Self {
43        Self { inner, next: None }
44    }
45
46    /// Sets the next interceptor, and then returns that interceptor, wrapped by
47    /// [InterceptorWithNext]. You can keep calling this method on it to extend the chain.
48    pub fn set_next(&mut self, next: Box<dyn WorkerInterceptor>) -> &mut InterceptorWithNext {
49        self.next.insert(Box::new(Self::new(next)))
50    }
51}
52
53#[async_trait::async_trait(?Send)]
54impl WorkerInterceptor for InterceptorWithNext {
55    async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
56        self.inner.on_workflow_activation_completion(c).await;
57        if let Some(next) = &self.next {
58            next.on_workflow_activation_completion(c).await;
59        }
60    }
61
62    fn on_shutdown(&self, w: &Worker) {
63        self.inner.on_shutdown(w);
64        if let Some(next) = &self.next {
65            next.on_shutdown(w);
66        }
67    }
68
69    async fn on_workflow_activation(&self, a: &WorkflowActivation) -> Result<(), anyhow::Error> {
70        self.inner.on_workflow_activation(a).await?;
71        if let Some(next) = &self.next {
72            next.on_workflow_activation(a).await?;
73        }
74        Ok(())
75    }
76}
77
78/// An interceptor which causes the worker's run function to exit early if nondeterminism errors are
79/// encountered
80pub struct FailOnNondeterminismInterceptor {}
81#[async_trait::async_trait(?Send)]
82impl WorkerInterceptor for FailOnNondeterminismInterceptor {
83    async fn on_workflow_activation(
84        &self,
85        activation: &WorkflowActivation,
86    ) -> Result<(), anyhow::Error> {
87        if matches!(
88            activation.eviction_reason(),
89            Some(EvictionReason::Nondeterminism)
90        ) {
91            bail!("Workflow is being evicted because of nondeterminism! {activation}");
92        }
93        Ok(())
94    }
95}
96
97/// An interceptor that allows you to fetch the exit value of the workflow if and when it is set
98#[derive(Default)]
99pub struct ReturnWorkflowExitValueInterceptor {
100    result_value: Arc<OnceLock<Payload>>,
101}
102
103impl ReturnWorkflowExitValueInterceptor {
104    /// Can be used to fetch the workflow result if/when it is determined
105    pub fn result_handle(&self) -> Arc<OnceLock<Payload>> {
106        self.result_value.clone()
107    }
108}
109
110#[async_trait::async_trait(?Send)]
111impl WorkerInterceptor for ReturnWorkflowExitValueInterceptor {
112    async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
113        if let Some(v) = c.complete_workflow_execution_value() {
114            let _ = self.result_value.set(v.clone());
115        }
116    }
117}