use crate::{
SyncWorkflowContext, WorkflowContext, WorkflowContextView,
runtime::{model::WorkflowTermination, types::WorkflowDefinitionDescriptor},
};
use futures_util::future::{FutureExt, LocalBoxFuture};
use std::any::Any;
use temporalio_common_wasm::{
QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
data_converters::{
GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
SerializationContextData, TemporalSerializable,
},
protos::temporal::api::{
common::v1::{Payload, Payloads},
failure::v1::Failure,
},
};
#[derive(Debug, thiserror::Error)]
pub enum WorkflowError {
#[error("Payload conversion error: {0}")]
PayloadConversion(#[from] PayloadConversionError),
#[error("Workflow execution error: {0}")]
Execution(#[from] Box<dyn std::error::Error + Send + Sync>),
}
impl From<WorkflowError> for Failure {
fn from(err: WorkflowError) -> Self {
Failure {
message: err.to_string(),
..Default::default()
}
}
}
fn downcast_handler_input<T: Any>(input: Box<dyn Any>, handler_kind: &'static str) -> T {
*input.downcast::<T>().unwrap_or_else(|_| {
panic!("typed {handler_kind} dispatch received input with wrong concrete type")
})
}
pub trait WorkflowImplementation: Sized + 'static {
type Run: WorkflowDefinition;
const HAS_INIT: bool;
const INIT_TAKES_INPUT: bool;
fn name() -> &'static str;
fn definition() -> WorkflowDefinitionDescriptor;
fn init(
ctx: WorkflowContextView,
input: Option<<Self::Run as WorkflowDefinition>::Input>,
) -> Self;
fn run(
ctx: WorkflowContext<Self>,
input: Option<<Self::Run as WorkflowDefinition>::Input>,
) -> LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>;
fn decode_signal_input(
_name: &str,
_payloads: Payloads,
_converter: &PayloadConverter,
) -> Result<Option<Box<dyn Any>>, WorkflowError> {
Ok(None)
}
fn dispatch_signal(
ctx: WorkflowContext<Self>,
name: &str,
input: Box<dyn Any>,
) -> LocalBoxFuture<'static, Result<(), WorkflowError>>;
fn decode_query_input(
_name: &str,
_payloads: &Payloads,
_converter: &PayloadConverter,
) -> Result<Option<Box<dyn Any>>, WorkflowError> {
Ok(None)
}
fn dispatch_query(
&self,
ctx: WorkflowContextView,
name: &str,
input: Box<dyn Any>,
converter: &PayloadConverter,
) -> Result<Payload, WorkflowError>;
fn decode_update_input(
_name: &str,
_payloads: Payloads,
_converter: &PayloadConverter,
) -> Result<Option<Box<dyn Any>>, WorkflowError> {
Ok(None)
}
fn dispatch_update(
ctx: WorkflowContext<Self>,
name: &str,
input: Box<dyn Any>,
converter: &PayloadConverter,
) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>>;
fn validate_update(
&self,
ctx: WorkflowContextView,
name: &str,
input: Box<dyn Any>,
) -> Result<(), WorkflowError>;
}
pub trait ExecutableSyncSignal<S: SignalDefinition>: WorkflowImplementation {
fn handle(&mut self, ctx: &mut SyncWorkflowContext<Self>, input: S::Input);
fn dispatch(
ctx: WorkflowContext<Self>,
input: Box<dyn Any>,
) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
let input = downcast_handler_input::<S::Input>(input, "signal");
let mut sync_ctx = ctx.sync_context();
ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
std::future::ready(Ok(())).boxed_local()
}
}
pub trait ExecutableAsyncSignal<S: SignalDefinition>: WorkflowImplementation {
fn handle(ctx: WorkflowContext<Self>, input: S::Input) -> LocalBoxFuture<'static, ()>;
fn dispatch(
ctx: WorkflowContext<Self>,
input: Box<dyn Any>,
) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
let input = downcast_handler_input::<S::Input>(input, "signal");
Self::handle(ctx, input).map(|()| Ok(())).boxed_local()
}
}
pub trait ExecutableQuery<Q: QueryDefinition>: WorkflowImplementation {
fn handle(
&self,
ctx: &WorkflowContextView,
input: Q::Input,
) -> Result<Q::Output, Box<dyn std::error::Error + Send + Sync>>;
fn dispatch(
&self,
ctx: &WorkflowContextView,
input: Box<dyn Any>,
converter: &PayloadConverter,
) -> Result<Payload, WorkflowError> {
let input = downcast_handler_input::<Q::Input>(input, "query");
let output = self.handle(ctx, input).map_err(WorkflowError::Execution)?;
serialize_output(&output, converter)
}
}
pub trait ExecutableSyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
fn handle(
&mut self,
ctx: &mut SyncWorkflowContext<Self>,
input: U::Input,
) -> Result<U::Output, Box<dyn std::error::Error + Send + Sync>>;
fn validate(
&self,
_ctx: &WorkflowContextView,
_input: &U::Input,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ok(())
}
fn dispatch(
ctx: WorkflowContext<Self>,
input: Box<dyn Any>,
converter: &PayloadConverter,
) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
let input = downcast_handler_input::<U::Input>(input, "update");
let mut sync_ctx = ctx.sync_context();
let result = ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
match result {
Ok(output) => match serialize_output(&output, converter) {
Ok(payload) => std::future::ready(Ok(payload)).boxed_local(),
Err(e) => std::future::ready(Err(e)).boxed_local(),
},
Err(e) => std::future::ready(Err(WorkflowError::Execution(e))).boxed_local(),
}
}
fn dispatch_validate(
&self,
ctx: &WorkflowContextView,
input: Box<dyn Any>,
) -> Result<(), WorkflowError> {
let input = downcast_handler_input::<U::Input>(input, "update validation");
self.validate(ctx, &input).map_err(WorkflowError::Execution)
}
}
pub trait ExecutableAsyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
fn handle(
ctx: WorkflowContext<Self>,
input: U::Input,
) -> LocalBoxFuture<'static, Result<U::Output, Box<dyn std::error::Error + Send + Sync>>>;
fn validate(
&self,
_ctx: &WorkflowContextView,
_input: &U::Input,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ok(())
}
fn dispatch(
ctx: WorkflowContext<Self>,
input: Box<dyn Any>,
converter: &PayloadConverter,
) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
let input = downcast_handler_input::<U::Input>(input, "update");
let converter = converter.clone();
async move {
let output = Self::handle(ctx, input)
.await
.map_err(WorkflowError::Execution)?;
serialize_output(&output, &converter)
}
.boxed_local()
}
fn dispatch_validate(
&self,
ctx: &WorkflowContextView,
input: Box<dyn Any>,
) -> Result<(), WorkflowError> {
let input = downcast_handler_input::<U::Input>(input, "update validation");
self.validate(ctx, &input).map_err(WorkflowError::Execution)
}
}
pub(crate) fn serialize_output<O: TemporalSerializable + 'static>(
output: &O,
converter: &PayloadConverter,
) -> Result<Payload, WorkflowError> {
let ctx = SerializationContext {
data: &SerializationContextData::Workflow,
converter,
};
converter.to_payload(&ctx, output).map_err(Into::into)
}
pub fn serialize_result<T: TemporalSerializable + 'static>(
result: T,
converter: &PayloadConverter,
) -> Result<Payload, WorkflowError> {
serialize_output(&result, converter)
}