use crate::{
Worker,
activities::{ActivityContext, ActivityError, ActivityInfo},
};
use anyhow::bail;
use futures_util::future::BoxFuture;
use std::{
any::Any,
collections::HashMap,
sync::{Arc, OnceLock},
};
use temporalio_common::{
data_converters::{
GenericPayloadConverter, PayloadConversionError, SerializationContext, TemporalSerializable,
},
protos::{
coresdk::{
workflow_activation::{WorkflowActivation, remove_from_cache::EvictionReason},
workflow_completion::WorkflowActivationCompletion,
},
temporal::api::common::v1::Payload,
},
};
mod activity_execution_value {
use super::*;
pub trait Sealed {
fn to_activity_payload(
&self,
context: &SerializationContext<'_>,
) -> Result<Payload, PayloadConversionError>;
}
impl<T> Sealed for T
where
T: Any + TemporalSerializable + Send + Sync,
{
fn to_activity_payload(
&self,
context: &SerializationContext<'_>,
) -> Result<Payload, PayloadConversionError> {
context.converter.to_payload(context, self)
}
}
}
#[async_trait::async_trait(?Send)]
pub trait WorkerInterceptor {
async fn on_workflow_activation_completion(&self, _completion: &WorkflowActivationCompletion) {}
fn on_shutdown(&self, _sdk_worker: &Worker) {}
async fn on_workflow_activation(
&self,
_activation: &WorkflowActivation,
) -> Result<(), anyhow::Error> {
Ok(())
}
}
pub struct Next<'a, I, O> {
inner: Box<dyn FnOnce(I) -> O + Send + 'a>,
}
impl<'a, I, O> Next<'a, I, O> {
pub(crate) fn new(f: impl FnOnce(I) -> O + Send + 'a) -> Self {
Self { inner: Box::new(f) }
}
pub fn run(self, input: I) -> O {
(self.inner)(input)
}
}
#[non_exhaustive]
pub struct ExecuteActivityInput {
context: ActivityContext,
args: Box<dyn Any + Send + Sync>,
}
impl ExecuteActivityInput {
pub(crate) fn new(context: ActivityContext, args: Box<dyn Any + Send + Sync>) -> Self {
Self { context, args }
}
pub(crate) fn into_parts(self) -> (ActivityContext, Box<dyn Any + Send + Sync>) {
(self.context, self.args)
}
pub fn activity_info(&self) -> &ActivityInfo {
self.context.info()
}
pub fn headers(&self) -> &HashMap<String, Payload> {
self.context.headers()
}
pub fn headers_mut(&mut self) -> &mut HashMap<String, Payload> {
self.context.headers_mut()
}
pub fn args_ref<T: Any>(&self) -> Option<&T> {
self.args.downcast_ref()
}
pub fn args_mut<T: Any>(&mut self) -> Option<&mut T> {
self.args.downcast_mut()
}
}
pub trait ActivityExecutionValue:
Any + TemporalSerializable + Send + Sync + activity_execution_value::Sealed
{
fn as_any(&self) -> &dyn Any;
}
impl<T> ActivityExecutionValue for T
where
T: Any + TemporalSerializable + Send + Sync,
{
fn as_any(&self) -> &dyn Any {
self
}
}
impl dyn ActivityExecutionValue {
pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
self.as_any().downcast_ref()
}
pub(crate) fn serialize_payload(
&self,
context: &SerializationContext<'_>,
) -> Result<Payload, PayloadConversionError> {
self.to_activity_payload(context)
}
}
pub type ExecuteActivityResult = Result<Box<dyn ActivityExecutionValue>, ActivityError>;
pub type ExecuteActivityOutput<'a> = BoxFuture<'a, ExecuteActivityResult>;
pub trait ActivityInboundInterceptor: Send + Sync + 'static {
fn execute_activity<'a>(
&'a self,
input: ExecuteActivityInput,
next: Next<'a, ExecuteActivityInput, ExecuteActivityOutput<'a>>,
) -> ExecuteActivityOutput<'a> {
next.run(input)
}
}
pub struct InterceptorWithNext {
inner: Box<dyn WorkerInterceptor>,
next: Option<Box<InterceptorWithNext>>,
}
impl InterceptorWithNext {
pub fn new(inner: Box<dyn WorkerInterceptor>) -> Self {
Self { inner, next: None }
}
pub fn set_next(&mut self, next: Box<dyn WorkerInterceptor>) -> &mut InterceptorWithNext {
self.next.insert(Box::new(Self::new(next)))
}
}
#[async_trait::async_trait(?Send)]
impl WorkerInterceptor for InterceptorWithNext {
async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
self.inner.on_workflow_activation_completion(c).await;
if let Some(next) = &self.next {
next.on_workflow_activation_completion(c).await;
}
}
fn on_shutdown(&self, w: &Worker) {
self.inner.on_shutdown(w);
if let Some(next) = &self.next {
next.on_shutdown(w);
}
}
async fn on_workflow_activation(&self, a: &WorkflowActivation) -> Result<(), anyhow::Error> {
self.inner.on_workflow_activation(a).await?;
if let Some(next) = &self.next {
next.on_workflow_activation(a).await?;
}
Ok(())
}
}
pub struct FailOnNondeterminismInterceptor {}
#[async_trait::async_trait(?Send)]
impl WorkerInterceptor for FailOnNondeterminismInterceptor {
async fn on_workflow_activation(
&self,
activation: &WorkflowActivation,
) -> Result<(), anyhow::Error> {
if matches!(
activation.eviction_reason(),
Some(EvictionReason::Nondeterminism)
) {
bail!("Workflow is being evicted because of nondeterminism! {activation}");
}
Ok(())
}
}
#[derive(Default)]
pub struct ReturnWorkflowExitValueInterceptor {
result_value: Arc<OnceLock<Payload>>,
}
impl ReturnWorkflowExitValueInterceptor {
pub fn result_handle(&self) -> Arc<OnceLock<Payload>> {
self.result_value.clone()
}
}
#[async_trait::async_trait(?Send)]
impl WorkerInterceptor for ReturnWorkflowExitValueInterceptor {
async fn on_workflow_activation_completion(&self, c: &WorkflowActivationCompletion) {
if let Some(v) = c.complete_workflow_execution_value() {
let _ = self.result_value.set(v.clone());
}
}
}