temporalio-workflow 0.5.0

Temporal Rust workflow authoring surface
Documentation
//! Runtime entry traits implemented by workflow definitions and message handlers.

use crate::{
    SyncWorkflowContext, WorkflowContext, WorkflowContextView,
    runtime::{model::WorkflowTermination, types::WorkflowDefinitionDescriptor},
};
use futures_util::future::{FutureExt, LocalBoxFuture};
use std::any::Any;
use temporalio_common_wasm::{
    QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
    data_converters::{
        GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
        SerializationContextData, TemporalSerializable,
    },
    protos::temporal::api::{
        common::v1::{Payload, Payloads},
        failure::v1::Failure,
    },
};

/// Error type for workflow operations
#[derive(Debug, thiserror::Error)]
pub enum WorkflowError {
    /// Error during payload conversion
    #[error("Payload conversion error: {0}")]
    PayloadConversion(#[from] PayloadConversionError),

    /// Workflow execution error
    #[error("Workflow execution error: {0}")]
    Execution(#[from] Box<dyn std::error::Error + Send + Sync>),
}

impl From<WorkflowError> for Failure {
    fn from(err: WorkflowError) -> Self {
        Failure {
            message: err.to_string(),
            ..Default::default()
        }
    }
}

fn downcast_handler_input<T: Any>(input: Box<dyn Any>, handler_kind: &'static str) -> T {
    *input.downcast::<T>().unwrap_or_else(|_| {
        panic!("typed {handler_kind} dispatch received input with wrong concrete type")
    })
}

/// Trait implemented by workflow structs to enable execution by the worker.
///
/// This trait is typically generated by the `#[workflow_methods]` macro and should not
/// be implemented manually in most cases.
pub trait WorkflowImplementation: Sized + 'static {
    /// The marker struct for the run method that implements `WorkflowDefinition`
    type Run: WorkflowDefinition;

    /// Whether this workflow has a user-defined `#[init]` method.
    /// Set to `true` by the macro when `#[init]` is present, `false` otherwise.
    const HAS_INIT: bool;

    /// Whether the init method accepts the workflow input.
    /// If true, input goes to init. If false, input goes to run.
    const INIT_TAKES_INPUT: bool;

    /// Returns the workflow type name.
    fn name() -> &'static str;

    /// Returns the exported workflow definition metadata for this workflow.
    fn definition() -> WorkflowDefinitionDescriptor;

    /// Initialize the workflow instance.
    fn init(
        ctx: WorkflowContextView,
        input: Option<<Self::Run as WorkflowDefinition>::Input>,
    ) -> Self;

    /// Execute the workflow's main run function.
    fn run(
        ctx: WorkflowContext<Self>,
        input: Option<<Self::Run as WorkflowDefinition>::Input>,
    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>;

    /// Decode a signal's payloads into that signal handler's concrete input type.
    fn decode_signal_input(
        _name: &str,
        _payloads: Payloads,
        _converter: &PayloadConverter,
    ) -> Result<Option<Box<dyn Any>>, WorkflowError> {
        Ok(None)
    }

    /// Dispatch a signal using an already decoded input value.
    fn dispatch_signal(
        ctx: WorkflowContext<Self>,
        name: &str,
        input: Box<dyn Any>,
    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>>;

    /// Decode a query's payloads into that query handler's concrete input type.
    fn decode_query_input(
        _name: &str,
        _payloads: &Payloads,
        _converter: &PayloadConverter,
    ) -> Result<Option<Box<dyn Any>>, WorkflowError> {
        Ok(None)
    }

    /// Dispatch a query using an already decoded input value.
    fn dispatch_query(
        &self,
        ctx: WorkflowContextView,
        name: &str,
        input: Box<dyn Any>,
        converter: &PayloadConverter,
    ) -> Result<Payload, WorkflowError>;

    /// Decode an update's payloads into that update handler's concrete input type.
    fn decode_update_input(
        _name: &str,
        _payloads: Payloads,
        _converter: &PayloadConverter,
    ) -> Result<Option<Box<dyn Any>>, WorkflowError> {
        Ok(None)
    }

    /// Dispatch an update using an already decoded input value.
    fn dispatch_update(
        ctx: WorkflowContext<Self>,
        name: &str,
        input: Box<dyn Any>,
        converter: &PayloadConverter,
    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>>;

    /// Validate an update using an already decoded input value.
    fn validate_update(
        &self,
        ctx: WorkflowContextView,
        name: &str,
        input: Box<dyn Any>,
    ) -> Result<(), WorkflowError>;
}

/// Trait for executing synchronous signal handlers on a workflow.
pub trait ExecutableSyncSignal<S: SignalDefinition>: WorkflowImplementation {
    /// Handle an incoming signal with the given input.
    fn handle(&mut self, ctx: &mut SyncWorkflowContext<Self>, input: S::Input);

    /// Dispatch the signal with an already decoded input.
    fn dispatch(
        ctx: WorkflowContext<Self>,
        input: Box<dyn Any>,
    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
        let input = downcast_handler_input::<S::Input>(input, "signal");
        let mut sync_ctx = ctx.sync_context();
        ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
        std::future::ready(Ok(())).boxed_local()
    }
}

/// Trait for executing asynchronous signal handlers on a workflow.
pub trait ExecutableAsyncSignal<S: SignalDefinition>: WorkflowImplementation {
    /// Handle an incoming signal with the given input.
    fn handle(ctx: WorkflowContext<Self>, input: S::Input) -> LocalBoxFuture<'static, ()>;

    /// Dispatch the signal with an already decoded input.
    fn dispatch(
        ctx: WorkflowContext<Self>,
        input: Box<dyn Any>,
    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
        let input = downcast_handler_input::<S::Input>(input, "signal");
        Self::handle(ctx, input).map(|()| Ok(())).boxed_local()
    }
}

/// Trait for executing query handlers on a workflow.
pub trait ExecutableQuery<Q: QueryDefinition>: WorkflowImplementation {
    /// Handle a query with the given input and return the result.
    fn handle(
        &self,
        ctx: &WorkflowContextView,
        input: Q::Input,
    ) -> Result<Q::Output, Box<dyn std::error::Error + Send + Sync>>;

    /// Dispatch the query with an already decoded input.
    fn dispatch(
        &self,
        ctx: &WorkflowContextView,
        input: Box<dyn Any>,
        converter: &PayloadConverter,
    ) -> Result<Payload, WorkflowError> {
        let input = downcast_handler_input::<Q::Input>(input, "query");
        let output = self.handle(ctx, input).map_err(WorkflowError::Execution)?;
        serialize_output(&output, converter)
    }
}

/// Trait for executing synchronous update handlers on a workflow.
pub trait ExecutableSyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
    /// Handle an update with the given input and return the result.
    fn handle(
        &mut self,
        ctx: &mut SyncWorkflowContext<Self>,
        input: U::Input,
    ) -> Result<U::Output, Box<dyn std::error::Error + Send + Sync>>;

    /// Validate an update before it is applied.
    fn validate(
        &self,
        _ctx: &WorkflowContextView,
        _input: &U::Input,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        Ok(())
    }

    /// Dispatch the update with an already decoded input.
    fn dispatch(
        ctx: WorkflowContext<Self>,
        input: Box<dyn Any>,
        converter: &PayloadConverter,
    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
        let input = downcast_handler_input::<U::Input>(input, "update");
        let mut sync_ctx = ctx.sync_context();
        let result = ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
        match result {
            Ok(output) => match serialize_output(&output, converter) {
                Ok(payload) => std::future::ready(Ok(payload)).boxed_local(),
                Err(e) => std::future::ready(Err(e)).boxed_local(),
            },
            Err(e) => std::future::ready(Err(WorkflowError::Execution(e))).boxed_local(),
        }
    }

    /// Dispatch validation with an already decoded input.
    fn dispatch_validate(
        &self,
        ctx: &WorkflowContextView,
        input: Box<dyn Any>,
    ) -> Result<(), WorkflowError> {
        let input = downcast_handler_input::<U::Input>(input, "update validation");
        self.validate(ctx, &input).map_err(WorkflowError::Execution)
    }
}

/// Trait for executing asynchronous update handlers on a workflow.
pub trait ExecutableAsyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
    /// Handle an update with the given input and return the result.
    fn handle(
        ctx: WorkflowContext<Self>,
        input: U::Input,
    ) -> LocalBoxFuture<'static, Result<U::Output, Box<dyn std::error::Error + Send + Sync>>>;

    /// Validate an update before it is applied.
    fn validate(
        &self,
        _ctx: &WorkflowContextView,
        _input: &U::Input,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        Ok(())
    }

    /// Dispatch the update with an already decoded input.
    fn dispatch(
        ctx: WorkflowContext<Self>,
        input: Box<dyn Any>,
        converter: &PayloadConverter,
    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
        let input = downcast_handler_input::<U::Input>(input, "update");
        let converter = converter.clone();
        async move {
            let output = Self::handle(ctx, input)
                .await
                .map_err(WorkflowError::Execution)?;
            serialize_output(&output, &converter)
        }
        .boxed_local()
    }

    /// Dispatch validation with an already decoded input.
    fn dispatch_validate(
        &self,
        ctx: &WorkflowContextView,
        input: Box<dyn Any>,
    ) -> Result<(), WorkflowError> {
        let input = downcast_handler_input::<U::Input>(input, "update validation");
        self.validate(ctx, &input).map_err(WorkflowError::Execution)
    }
}

/// Serialize handler output to a payload.
pub(crate) fn serialize_output<O: TemporalSerializable + 'static>(
    output: &O,
    converter: &PayloadConverter,
) -> Result<Payload, WorkflowError> {
    let ctx = SerializationContext {
        data: &SerializationContextData::Workflow,
        converter,
    };
    converter.to_payload(&ctx, output).map_err(Into::into)
}

/// Serialize a workflow result value to a payload.
pub fn serialize_result<T: TemporalSerializable + 'static>(
    result: T,
    converter: &PayloadConverter,
) -> Result<Payload, WorkflowError> {
    serialize_output(&result, converter)
}