1#[doc(inline)]
70pub use crate::__temporal_select as select;
71
72#[doc(inline)]
90pub use crate::__temporal_join as join;
91
92pub use futures_util::future::join_all;
113
114use crate::{
115 BaseWorkflowContext, SyncWorkflowContext, WorkflowContext, WorkflowContextView,
116 WorkflowTermination,
117};
118use futures_util::future::{Fuse, FutureExt, LocalBoxFuture};
119use std::{
120 cell::RefCell,
121 collections::HashMap,
122 fmt::Debug,
123 pin::Pin,
124 rc::Rc,
125 sync::Arc,
126 task::{Context as TaskContext, Poll},
127};
128use temporalio_common::{
129 QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
130 data_converters::{
131 GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
132 SerializationContextData, TemporalDeserializable, TemporalSerializable,
133 },
134 protos::temporal::api::{
135 common::v1::{Payload, Payloads},
136 failure::v1::Failure,
137 },
138};
139
140#[derive(Debug, thiserror::Error)]
142pub enum WorkflowError {
143 #[error("Payload conversion error: {0}")]
145 PayloadConversion(#[from] PayloadConversionError),
146
147 #[error("Workflow execution error: {0}")]
149 Execution(#[from] anyhow::Error),
150}
151
152impl From<WorkflowError> for Failure {
153 fn from(err: WorkflowError) -> Self {
154 Failure {
155 message: err.to_string(),
156 ..Default::default()
157 }
158 }
159}
160
161#[doc(hidden)]
166pub trait WorkflowImplementation: Sized + 'static {
167 type Run: WorkflowDefinition;
169
170 const HAS_INIT: bool;
173
174 const INIT_TAKES_INPUT: bool;
177
178 fn name() -> &'static str;
180
181 fn init(
186 ctx: WorkflowContextView,
187 input: Option<<Self::Run as WorkflowDefinition>::Input>,
188 ) -> Self;
189
190 fn run(
194 ctx: WorkflowContext<Self>,
195 input: Option<<Self::Run as WorkflowDefinition>::Input>,
196 ) -> LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>;
197
198 fn dispatch_update(
200 _ctx: WorkflowContext<Self>,
201 _name: &str,
202 _payloads: Payloads,
203 _converter: &PayloadConverter,
204 ) -> Option<LocalBoxFuture<'static, Result<Payload, WorkflowError>>> {
205 None
206 }
207
208 fn validate_update(
213 &self,
214 _ctx: WorkflowContextView,
215 _name: &str,
216 _payloads: &Payloads,
217 _converter: &PayloadConverter,
218 ) -> Option<Result<(), WorkflowError>> {
219 None
220 }
221
222 fn dispatch_signal(
228 _ctx: WorkflowContext<Self>,
229 _name: &str,
230 _payloads: Payloads,
231 _converter: &PayloadConverter,
232 ) -> Option<LocalBoxFuture<'static, Result<(), WorkflowError>>> {
233 None
234 }
235
236 fn dispatch_query(
241 &self,
242 _ctx: WorkflowContextView,
243 _name: &str,
244 _payloads: &Payloads,
245 _converter: &PayloadConverter,
246 ) -> Option<Result<Payload, WorkflowError>> {
247 None
248 }
249}
250
251#[doc(hidden)]
257pub trait ExecutableSyncSignal<S: SignalDefinition>: WorkflowImplementation {
258 fn handle(&mut self, ctx: &mut SyncWorkflowContext<Self>, input: S::Input);
260
261 fn dispatch(
263 ctx: WorkflowContext<Self>,
264 payloads: Payloads,
265 converter: &PayloadConverter,
266 ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
267 match deserialize_input::<S::Input>(payloads.payloads, converter) {
268 Ok(input) => {
269 let mut sync_ctx = ctx.sync_context();
270 ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
271 std::future::ready(Ok(())).boxed_local()
272 }
273 Err(e) => std::future::ready(Err(e)).boxed_local(),
274 }
275 }
276}
277
278#[doc(hidden)]
280pub trait ExecutableAsyncSignal<S: SignalDefinition>: WorkflowImplementation {
281 fn handle(ctx: WorkflowContext<Self>, input: S::Input) -> LocalBoxFuture<'static, ()>;
283
284 fn dispatch(
286 ctx: WorkflowContext<Self>,
287 payloads: Payloads,
288 converter: &PayloadConverter,
289 ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
290 match deserialize_input::<S::Input>(payloads.payloads, converter) {
291 Ok(input) => Self::handle(ctx, input).map(|()| Ok(())).boxed_local(),
292 Err(e) => std::future::ready(Err(e)).boxed_local(),
293 }
294 }
295}
296
297#[doc(hidden)]
302pub trait ExecutableQuery<Q: QueryDefinition>: WorkflowImplementation {
303 fn handle(
308 &self,
309 ctx: &WorkflowContextView,
310 input: Q::Input,
311 ) -> Result<Q::Output, Box<dyn std::error::Error + Send + Sync>>;
312
313 fn dispatch(
315 &self,
316 ctx: &WorkflowContextView,
317 payloads: &Payloads,
318 converter: &PayloadConverter,
319 ) -> Result<Payload, WorkflowError> {
320 let input = deserialize_input::<Q::Input>(payloads.payloads.clone(), converter)?;
321 let output = self.handle(ctx, input).map_err(wrap_handler_error)?;
322 serialize_output(&output, converter)
323 }
324}
325
326#[doc(hidden)]
328pub trait ExecutableSyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
329 fn handle(
332 &mut self,
333 ctx: &mut SyncWorkflowContext<Self>,
334 input: U::Input,
335 ) -> Result<U::Output, Box<dyn std::error::Error + Send + Sync>>;
336
337 fn validate(
339 &self,
340 _ctx: &WorkflowContextView,
341 _input: &U::Input,
342 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
343 Ok(())
344 }
345
346 fn dispatch(
348 ctx: WorkflowContext<Self>,
349 payloads: Payloads,
350 converter: &PayloadConverter,
351 ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
352 let input = match deserialize_input::<U::Input>(payloads.payloads, converter) {
353 Ok(v) => v,
354 Err(e) => return std::future::ready(Err(e)).boxed_local(),
355 };
356 let converter = converter.clone();
357 let mut sync_ctx = ctx.sync_context();
358 let result = ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
359 match result {
360 Ok(output) => match serialize_output(&output, &converter) {
361 Ok(payload) => std::future::ready(Ok(payload)).boxed_local(),
362 Err(e) => std::future::ready(Err(e)).boxed_local(),
363 },
364 Err(e) => std::future::ready(Err(wrap_handler_error(e))).boxed_local(),
365 }
366 }
367
368 fn dispatch_validate(
370 &self,
371 ctx: &WorkflowContextView,
372 payloads: &Payloads,
373 converter: &PayloadConverter,
374 ) -> Result<(), WorkflowError> {
375 let input = deserialize_input::<U::Input>(payloads.payloads.clone(), converter)?;
376 self.validate(ctx, &input).map_err(wrap_handler_error)
377 }
378}
379
380#[doc(hidden)]
382pub trait ExecutableAsyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
383 fn handle(
386 ctx: WorkflowContext<Self>,
387 input: U::Input,
388 ) -> LocalBoxFuture<'static, Result<U::Output, Box<dyn std::error::Error + Send + Sync>>>;
389
390 fn validate(
392 &self,
393 _ctx: &WorkflowContextView,
394 _input: &U::Input,
395 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
396 Ok(())
397 }
398
399 fn dispatch(
401 ctx: WorkflowContext<Self>,
402 payloads: Payloads,
403 converter: &PayloadConverter,
404 ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
405 let input = match deserialize_input::<U::Input>(payloads.payloads, converter) {
406 Ok(v) => v,
407 Err(e) => return std::future::ready(Err(e)).boxed_local(),
408 };
409 let converter = converter.clone();
410 async move {
411 let output = Self::handle(ctx, input).await.map_err(wrap_handler_error)?;
412 serialize_output(&output, &converter)
413 }
414 .boxed_local()
415 }
416
417 fn dispatch_validate(
419 &self,
420 ctx: &WorkflowContextView,
421 payloads: &Payloads,
422 converter: &PayloadConverter,
423 ) -> Result<(), WorkflowError> {
424 let input = deserialize_input::<U::Input>(payloads.payloads.clone(), converter)?;
425 self.validate(ctx, &input).map_err(wrap_handler_error)
426 }
427}
428
429pub(crate) struct DispatchData<'a> {
431 pub(crate) payloads: Payloads,
432 pub(crate) headers: HashMap<String, Payload>,
433 pub(crate) converter: &'a PayloadConverter,
434}
435
436#[doc(hidden)]
440pub trait WorkflowImplementer: WorkflowImplementation {
441 fn register_all(defs: &mut WorkflowDefinitions);
443}
444
445pub(crate) trait DynWorkflowExecution {
447 fn poll_run(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<Payload, WorkflowTermination>>;
449
450 fn validate_update(&self, name: &str, data: &DispatchData)
452 -> Option<Result<(), WorkflowError>>;
453
454 fn start_update(
456 &mut self,
457 name: &str,
458 data: DispatchData,
459 ) -> Option<LocalBoxFuture<'static, Result<Payload, WorkflowError>>>;
460
461 fn dispatch_signal(
463 &mut self,
464 name: &str,
465 data: DispatchData,
466 ) -> Option<LocalBoxFuture<'static, Result<(), WorkflowError>>>;
467
468 fn dispatch_query(
470 &self,
471 name: &str,
472 data: DispatchData,
473 ) -> Option<Result<Payload, WorkflowError>>;
474}
475
476pub(crate) struct WorkflowExecution<W: WorkflowImplementation> {
478 ctx: WorkflowContext<W>,
479 run_future: Fuse<LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>>,
480}
481
482impl<W: WorkflowImplementation> WorkflowExecution<W>
483where
484 <W::Run as WorkflowDefinition>::Input: Send,
485{
486 pub(crate) fn new(
488 base_ctx: BaseWorkflowContext,
489 init_input: Option<<W::Run as WorkflowDefinition>::Input>,
490 run_input: Option<<W::Run as WorkflowDefinition>::Input>,
491 ) -> Self {
492 let view = base_ctx.view();
493 let workflow = W::init(view, init_input);
494 Self::new_with_workflow(workflow, base_ctx, run_input)
495 }
496
497 pub(crate) fn new_with_workflow(
499 workflow: W,
500 base_ctx: BaseWorkflowContext,
501 run_input: Option<<W::Run as WorkflowDefinition>::Input>,
502 ) -> Self {
503 let workflow = Rc::new(RefCell::new(workflow));
504 let ctx = WorkflowContext::from_base(base_ctx, workflow);
505 let run_future = W::run(ctx.clone(), run_input).fuse();
506
507 Self { ctx, run_future }
508 }
509}
510
511impl<W: WorkflowImplementation> DynWorkflowExecution for WorkflowExecution<W> {
512 fn poll_run(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<Payload, WorkflowTermination>> {
513 Pin::new(&mut self.run_future).poll(cx)
514 }
515
516 fn validate_update(
517 &self,
518 name: &str,
519 data: &DispatchData,
520 ) -> Option<Result<(), WorkflowError>> {
521 let view = self.ctx.view();
522 self.ctx
523 .state(|wf| wf.validate_update(view, name, &data.payloads, data.converter))
524 }
525
526 fn start_update(
527 &mut self,
528 name: &str,
529 data: DispatchData,
530 ) -> Option<LocalBoxFuture<'static, Result<Payload, WorkflowError>>> {
531 let ctx = self.ctx.with_headers(data.headers);
532 W::dispatch_update(ctx, name, data.payloads, data.converter)
533 }
534
535 fn dispatch_signal(
536 &mut self,
537 name: &str,
538 data: DispatchData,
539 ) -> Option<LocalBoxFuture<'static, Result<(), WorkflowError>>> {
540 let ctx = self.ctx.with_headers(data.headers);
541 W::dispatch_signal(ctx, name, data.payloads, data.converter)
542 }
543
544 fn dispatch_query(
545 &self,
546 name: &str,
547 data: DispatchData,
548 ) -> Option<Result<Payload, WorkflowError>> {
549 let view = self.ctx.view();
550 self.ctx
551 .state(|wf| wf.dispatch_query(view, name, &data.payloads, data.converter))
552 }
553}
554
555pub(crate) type WorkflowExecutionFactory = Arc<
559 dyn Fn(
560 Vec<Payload>,
561 PayloadConverter,
562 BaseWorkflowContext,
563 ) -> Result<Box<dyn DynWorkflowExecution>, PayloadConversionError>
564 + Send
565 + Sync,
566>;
567
568#[derive(Default, Clone)]
570pub struct WorkflowDefinitions {
571 workflows: HashMap<&'static str, WorkflowExecutionFactory>,
573}
574
575impl WorkflowDefinitions {
576 pub fn new() -> Self {
578 Self::default()
579 }
580
581 pub fn register_workflow<W: WorkflowImplementer>(&mut self) -> &mut Self {
583 W::register_all(self);
584 self
585 }
586
587 #[doc(hidden)]
589 pub fn register_workflow_run<W: WorkflowImplementation>(&mut self) -> &mut Self
590 where
591 <W::Run as WorkflowDefinition>::Input: Send,
592 {
593 let workflow_name = W::name();
594 let factory: WorkflowExecutionFactory =
595 Arc::new(move |payloads, converter: PayloadConverter, base_ctx| {
596 let ser_ctx = SerializationContext {
597 data: &SerializationContextData::Workflow,
598 converter: &converter,
599 };
600 let input = converter.from_payloads(&ser_ctx, payloads)?;
601 let (init_input, run_input) = if W::INIT_TAKES_INPUT {
602 (Some(input), None)
603 } else {
604 (None, Some(input))
605 };
606 Ok(
607 Box::new(WorkflowExecution::<W>::new(base_ctx, init_input, run_input))
608 as Box<dyn DynWorkflowExecution>,
609 )
610 });
611 self.workflows.insert(workflow_name, factory);
612 self
613 }
614
615 pub fn register_workflow_run_with_factory<W, F>(&mut self, user_factory: F) -> &mut Self
617 where
618 W: WorkflowImplementation,
619 <W::Run as WorkflowDefinition>::Input: Send,
620 F: Fn() -> W + Send + Sync + 'static,
621 {
622 assert!(
623 !W::HAS_INIT,
624 "Workflows registered with a factory must not define an #[init] method. \
625 The factory replaces init for instance creation."
626 );
627
628 let workflow_name = W::name();
629 let user_factory = Arc::new(user_factory);
630 let factory: WorkflowExecutionFactory =
631 Arc::new(move |payloads, converter: PayloadConverter, base_ctx| {
632 let ser_ctx = SerializationContext {
633 data: &SerializationContextData::Workflow,
634 converter: &converter,
635 };
636 let input: <W::Run as WorkflowDefinition>::Input =
637 converter.from_payloads(&ser_ctx, payloads)?;
638
639 let workflow = user_factory();
641 Ok(Box::new(WorkflowExecution::<W>::new_with_workflow(
642 workflow,
643 base_ctx,
644 Some(input),
645 )) as Box<dyn DynWorkflowExecution>)
646 });
647
648 self.workflows.insert(workflow_name, factory);
649 self
650 }
651
652 pub fn is_empty(&self) -> bool {
654 self.workflows.is_empty()
655 }
656
657 pub(crate) fn get_workflow(&self, workflow_type: &str) -> Option<WorkflowExecutionFactory> {
659 self.workflows.get(workflow_type).cloned()
660 }
661
662 pub fn workflow_types(&self) -> impl Iterator<Item = &'static str> + '_ {
664 self.workflows.keys().copied()
665 }
666}
667
668impl Debug for WorkflowDefinitions {
669 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670 f.debug_struct("WorkflowDefinitions")
671 .field("workflows", &self.workflows.keys().collect::<Vec<_>>())
672 .finish()
673 }
674}
675
676pub fn deserialize_input<I: TemporalDeserializable + 'static>(
678 payloads: Vec<Payload>,
679 converter: &PayloadConverter,
680) -> Result<I, WorkflowError> {
681 let ctx = SerializationContext {
682 data: &SerializationContextData::Workflow,
683 converter,
684 };
685 converter.from_payloads(&ctx, payloads).map_err(Into::into)
686}
687
688pub fn serialize_output<O: TemporalSerializable + 'static>(
690 output: &O,
691 converter: &PayloadConverter,
692) -> Result<Payload, WorkflowError> {
693 let ctx = SerializationContext {
694 data: &SerializationContextData::Workflow,
695 converter,
696 };
697 converter.to_payload(&ctx, output).map_err(Into::into)
698}
699
700pub fn wrap_handler_error(e: Box<dyn std::error::Error + Send + Sync>) -> WorkflowError {
702 WorkflowError::Execution(anyhow::anyhow!(e))
703}
704
705pub fn serialize_result<T: TemporalSerializable + 'static>(
707 result: T,
708 converter: &PayloadConverter,
709) -> Result<Payload, WorkflowError> {
710 serialize_output(&result, converter)
711}