temporalio_workflow/runtime/
entry.rs1use crate::{
4 SyncWorkflowContext, WorkflowContext, WorkflowContextView,
5 runtime::{model::WorkflowTermination, types::WorkflowDefinitionDescriptor},
6};
7use futures_util::future::{FutureExt, LocalBoxFuture};
8use std::any::Any;
9use temporalio_common_wasm::{
10 QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
11 data_converters::{
12 GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
13 SerializationContextData, TemporalSerializable,
14 },
15 protos::temporal::api::{
16 common::v1::{Payload, Payloads},
17 failure::v1::Failure,
18 },
19};
20
21#[derive(Debug, thiserror::Error)]
23pub enum WorkflowError {
24 #[error("Payload conversion error: {0}")]
26 PayloadConversion(#[from] PayloadConversionError),
27
28 #[error("Workflow execution error: {0}")]
30 Execution(#[from] Box<dyn std::error::Error + Send + Sync>),
31}
32
33impl From<WorkflowError> for Failure {
34 fn from(err: WorkflowError) -> Self {
35 Failure {
36 message: err.to_string(),
37 ..Default::default()
38 }
39 }
40}
41
42fn downcast_handler_input<T: Any>(input: Box<dyn Any>, handler_kind: &'static str) -> T {
43 *input.downcast::<T>().unwrap_or_else(|_| {
44 panic!("typed {handler_kind} dispatch received input with wrong concrete type")
45 })
46}
47
48pub trait WorkflowImplementation: Sized + 'static {
53 type Run: WorkflowDefinition;
55
56 const HAS_INIT: bool;
59
60 const INIT_TAKES_INPUT: bool;
63
64 fn name() -> &'static str;
66
67 fn definition() -> WorkflowDefinitionDescriptor;
69
70 fn init(
72 ctx: WorkflowContextView,
73 input: Option<<Self::Run as WorkflowDefinition>::Input>,
74 ) -> Self;
75
76 fn run(
78 ctx: WorkflowContext<Self>,
79 input: Option<<Self::Run as WorkflowDefinition>::Input>,
80 ) -> LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>;
81
82 fn decode_signal_input(
84 _name: &str,
85 _payloads: Payloads,
86 _converter: &PayloadConverter,
87 ) -> Result<Option<Box<dyn Any>>, WorkflowError> {
88 Ok(None)
89 }
90
91 fn dispatch_signal(
93 ctx: WorkflowContext<Self>,
94 name: &str,
95 input: Box<dyn Any>,
96 ) -> LocalBoxFuture<'static, Result<(), WorkflowError>>;
97
98 fn decode_query_input(
100 _name: &str,
101 _payloads: &Payloads,
102 _converter: &PayloadConverter,
103 ) -> Result<Option<Box<dyn Any>>, WorkflowError> {
104 Ok(None)
105 }
106
107 fn dispatch_query(
109 &self,
110 ctx: WorkflowContextView,
111 name: &str,
112 input: Box<dyn Any>,
113 converter: &PayloadConverter,
114 ) -> Result<Payload, WorkflowError>;
115
116 fn decode_update_input(
118 _name: &str,
119 _payloads: Payloads,
120 _converter: &PayloadConverter,
121 ) -> Result<Option<Box<dyn Any>>, WorkflowError> {
122 Ok(None)
123 }
124
125 fn dispatch_update(
127 ctx: WorkflowContext<Self>,
128 name: &str,
129 input: Box<dyn Any>,
130 converter: &PayloadConverter,
131 ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>>;
132
133 fn validate_update(
135 &self,
136 ctx: WorkflowContextView,
137 name: &str,
138 input: Box<dyn Any>,
139 ) -> Result<(), WorkflowError>;
140}
141
142pub trait ExecutableSyncSignal<S: SignalDefinition>: WorkflowImplementation {
144 fn handle(&mut self, ctx: &mut SyncWorkflowContext<Self>, input: S::Input);
146
147 fn dispatch(
149 ctx: WorkflowContext<Self>,
150 input: Box<dyn Any>,
151 ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
152 let input = downcast_handler_input::<S::Input>(input, "signal");
153 let mut sync_ctx = ctx.sync_context();
154 ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
155 std::future::ready(Ok(())).boxed_local()
156 }
157}
158
159pub trait ExecutableAsyncSignal<S: SignalDefinition>: WorkflowImplementation {
161 fn handle(ctx: WorkflowContext<Self>, input: S::Input) -> LocalBoxFuture<'static, ()>;
163
164 fn dispatch(
166 ctx: WorkflowContext<Self>,
167 input: Box<dyn Any>,
168 ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
169 let input = downcast_handler_input::<S::Input>(input, "signal");
170 Self::handle(ctx, input).map(|()| Ok(())).boxed_local()
171 }
172}
173
174pub trait ExecutableQuery<Q: QueryDefinition>: WorkflowImplementation {
176 fn handle(
178 &self,
179 ctx: &WorkflowContextView,
180 input: Q::Input,
181 ) -> Result<Q::Output, Box<dyn std::error::Error + Send + Sync>>;
182
183 fn dispatch(
185 &self,
186 ctx: &WorkflowContextView,
187 input: Box<dyn Any>,
188 converter: &PayloadConverter,
189 ) -> Result<Payload, WorkflowError> {
190 let input = downcast_handler_input::<Q::Input>(input, "query");
191 let output = self.handle(ctx, input).map_err(WorkflowError::Execution)?;
192 serialize_output(&output, converter)
193 }
194}
195
196pub trait ExecutableSyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
198 fn handle(
200 &mut self,
201 ctx: &mut SyncWorkflowContext<Self>,
202 input: U::Input,
203 ) -> Result<U::Output, Box<dyn std::error::Error + Send + Sync>>;
204
205 fn validate(
207 &self,
208 _ctx: &WorkflowContextView,
209 _input: &U::Input,
210 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
211 Ok(())
212 }
213
214 fn dispatch(
216 ctx: WorkflowContext<Self>,
217 input: Box<dyn Any>,
218 converter: &PayloadConverter,
219 ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
220 let input = downcast_handler_input::<U::Input>(input, "update");
221 let mut sync_ctx = ctx.sync_context();
222 let result = ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
223 match result {
224 Ok(output) => match serialize_output(&output, converter) {
225 Ok(payload) => std::future::ready(Ok(payload)).boxed_local(),
226 Err(e) => std::future::ready(Err(e)).boxed_local(),
227 },
228 Err(e) => std::future::ready(Err(WorkflowError::Execution(e))).boxed_local(),
229 }
230 }
231
232 fn dispatch_validate(
234 &self,
235 ctx: &WorkflowContextView,
236 input: Box<dyn Any>,
237 ) -> Result<(), WorkflowError> {
238 let input = downcast_handler_input::<U::Input>(input, "update validation");
239 self.validate(ctx, &input).map_err(WorkflowError::Execution)
240 }
241}
242
243pub trait ExecutableAsyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
245 fn handle(
247 ctx: WorkflowContext<Self>,
248 input: U::Input,
249 ) -> LocalBoxFuture<'static, Result<U::Output, Box<dyn std::error::Error + Send + Sync>>>;
250
251 fn validate(
253 &self,
254 _ctx: &WorkflowContextView,
255 _input: &U::Input,
256 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
257 Ok(())
258 }
259
260 fn dispatch(
262 ctx: WorkflowContext<Self>,
263 input: Box<dyn Any>,
264 converter: &PayloadConverter,
265 ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
266 let input = downcast_handler_input::<U::Input>(input, "update");
267 let converter = converter.clone();
268 async move {
269 let output = Self::handle(ctx, input)
270 .await
271 .map_err(WorkflowError::Execution)?;
272 serialize_output(&output, &converter)
273 }
274 .boxed_local()
275 }
276
277 fn dispatch_validate(
279 &self,
280 ctx: &WorkflowContextView,
281 input: Box<dyn Any>,
282 ) -> Result<(), WorkflowError> {
283 let input = downcast_handler_input::<U::Input>(input, "update validation");
284 self.validate(ctx, &input).map_err(WorkflowError::Execution)
285 }
286}
287
288pub(crate) fn serialize_output<O: TemporalSerializable + 'static>(
290 output: &O,
291 converter: &PayloadConverter,
292) -> Result<Payload, WorkflowError> {
293 let ctx = SerializationContext {
294 data: &SerializationContextData::Workflow,
295 converter,
296 };
297 converter.to_payload(&ctx, output).map_err(Into::into)
298}
299
300pub fn serialize_result<T: TemporalSerializable + 'static>(
302 result: T,
303 converter: &PayloadConverter,
304) -> Result<Payload, WorkflowError> {
305 serialize_output(&result, converter)
306}