temporalio_sdk/
interceptors.rs1use crate::Worker;
4use anyhow::bail;
5use std::sync::{Arc, OnceLock};
6use temporalio_common::protos::{
7 coresdk::{
8 workflow_activation::{WorkflowActivation, remove_from_cache::EvictionReason},
9 workflow_completion::WorkflowActivationCompletion,
10 },
11 temporal::api::common::v1::Payload,
12};
13
14#[async_trait::async_trait(?Send)]
18pub trait WorkerInterceptor {
19 async fn on_workflow_activation_completion(&self, _completion: &WorkflowActivationCompletion) {}
22 fn on_shutdown(&self, _sdk_worker: &Worker) {}
25 async fn on_workflow_activation(
27 &self,
28 _activation: &WorkflowActivation,
29 ) -> Result<(), anyhow::Error> {
30 Ok(())
31 }
32}
33
34pub struct InterceptorWithNext {
36 inner: Box<dyn WorkerInterceptor>,
37 next: Option<Box<InterceptorWithNext>>,
38}
39
40impl InterceptorWithNext {
41 pub fn new(inner: Box<dyn WorkerInterceptor>) -> Self {
43 Self { inner, next: None }
44 }
45
46 pub fn set_next(&mut self, next: Box<dyn WorkerInterceptor>) -> &mut InterceptorWithNext {
49 self.next.insert(Box::new(Self::new(next)))
50 }
51}
52
53#[async_trait::async_trait(?Send)]
54impl WorkerInterceptor for InterceptorWithNext {
55 async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
56 self.inner.on_workflow_activation_completion(c).await;
57 if let Some(next) = &self.next {
58 next.on_workflow_activation_completion(c).await;
59 }
60 }
61
62 fn on_shutdown(&self, w: &Worker) {
63 self.inner.on_shutdown(w);
64 if let Some(next) = &self.next {
65 next.on_shutdown(w);
66 }
67 }
68
69 async fn on_workflow_activation(&self, a: &WorkflowActivation) -> Result<(), anyhow::Error> {
70 self.inner.on_workflow_activation(a).await?;
71 if let Some(next) = &self.next {
72 next.on_workflow_activation(a).await?;
73 }
74 Ok(())
75 }
76}
77
78pub struct FailOnNondeterminismInterceptor {}
81#[async_trait::async_trait(?Send)]
82impl WorkerInterceptor for FailOnNondeterminismInterceptor {
83 async fn on_workflow_activation(
84 &self,
85 activation: &WorkflowActivation,
86 ) -> Result<(), anyhow::Error> {
87 if matches!(
88 activation.eviction_reason(),
89 Some(EvictionReason::Nondeterminism)
90 ) {
91 bail!("Workflow is being evicted because of nondeterminism! {activation}");
92 }
93 Ok(())
94 }
95}
96
97#[derive(Default)]
99pub struct ReturnWorkflowExitValueInterceptor {
100 result_value: Arc<OnceLock<Payload>>,
101}
102
103impl ReturnWorkflowExitValueInterceptor {
104 pub fn result_handle(&self) -> Arc<OnceLock<Payload>> {
106 self.result_value.clone()
107 }
108}
109
110#[async_trait::async_trait(?Send)]
111impl WorkerInterceptor for ReturnWorkflowExitValueInterceptor {
112 async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
113 if let Some(v) = c.complete_workflow_execution_value() {
114 let _ = self.result_value.set(v.clone());
115 }
116 }
117}