temporal-sdk-core 0.1.0-alpha.1

Library for building new Temporal SDKs
Documentation
use crate::{
    protos::coresdk::activity_result::ActivityResult,
    protos::coresdk::workflow_completion::WfActivationCompletion,
    protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse,
    workflow::WorkflowError,
};
use tonic::codegen::http::uri::InvalidUri;

pub(crate) struct ShutdownErr;
pub(crate) struct WorkflowUpdateError {
    /// Underlying workflow error
    pub source: WorkflowError,
    /// The run id of the erring workflow
    pub run_id: String,
}

/// Errors thrown during initialization of [crate::Core]
#[derive(thiserror::Error, Debug)]
pub enum CoreInitError {
    /// Invalid URI. Configuration error, fatal.
    #[error("Invalid URI: {0:?}")]
    InvalidUri(#[from] InvalidUri),
    /// Server connection error. Crashing and restarting the worker is likely best.
    #[error("Server connection error: {0:?}")]
    TonicTransportError(#[from] tonic::transport::Error),
}

/// Errors thrown by [crate::Core::poll_workflow_task]
#[derive(thiserror::Error, Debug)]
pub enum PollWfError {
    /// There was an error specific to a workflow instance. The cached workflow should be deleted
    /// from lang side.
    #[error("There was an error with the workflow instance with run id ({run_id}): {source:?}")]
    WorkflowUpdateError {
        /// Underlying workflow error
        source: WorkflowError,
        /// The run id of the erring workflow
        run_id: String,
    },
    /// The server returned a malformed polling response. Either we aren't handling a valid form,
    /// or the server is bugging out. Likely fatal.
    #[error("Poll workflow response from server was malformed: {0:?}")]
    BadPollResponseFromServer(PollWorkflowTaskQueueResponse),
    /// [crate::Core::shutdown] was called, and there are no more replay tasks to be handled. Lang
    /// must call [crate::Core::complete_workflow_task] for any remaining tasks, and then may
    /// exit.
    #[error("Core is shut down and there are no more workflow replay tasks")]
    ShutDown,
    /// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
    /// errors, so lang should consider this fatal.
    #[error("Unhandled error when calling the temporal server: {0:?}")]
    TonicError(#[from] tonic::Status),
}

impl From<WorkflowUpdateError> for PollWfError {
    fn from(e: WorkflowUpdateError) -> Self {
        Self::WorkflowUpdateError {
            source: e.source,
            run_id: e.run_id,
        }
    }
}

impl From<ShutdownErr> for PollWfError {
    fn from(_: ShutdownErr) -> Self {
        Self::ShutDown
    }
}

/// Errors thrown by [crate::Core::poll_activity_task]
#[derive(thiserror::Error, Debug)]
pub enum PollActivityError {
    /// [crate::Core::shutdown] was called, we will no longer fetch new activity tasks. Lang must
    /// ensure it is finished with any workflow replay, see [PollWfError::ShutDown]
    #[error("Core is shut down")]
    ShutDown,
    /// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
    /// errors, so lang should consider this fatal.
    #[error("Unhandled error when calling the temporal server: {0:?}")]
    TonicError(#[from] tonic::Status),
}

impl From<ShutdownErr> for PollActivityError {
    fn from(_: ShutdownErr) -> Self {
        Self::ShutDown
    }
}

/// Errors thrown by [crate::Core::complete_workflow_task]
#[derive(thiserror::Error, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum CompleteWfError {
    /// Lang SDK sent us a malformed workflow completion. This likely means a bug in the lang sdk.
    #[error("Lang SDK sent us a malformed workflow completion ({reason}): {completion:?}")]
    MalformedWorkflowCompletion {
        /// Reason the completion was malformed
        reason: String,
        /// The completion, which may not be included to avoid unnecessary copies.
        completion: Option<WfActivationCompletion>,
    },
    /// There was an error specific to a workflow instance. The cached workflow should be deleted
    /// from lang side.
    #[error("There was an error with the workflow instance with run id ({run_id}): {source:?}")]
    WorkflowUpdateError {
        /// Underlying workflow error
        source: WorkflowError,
        /// The run id of the erring workflow
        run_id: String,
    },
    /// There exists a pending command in this workflow's history which has not yet been handled.
    /// When thrown from [crate::Core::complete_workflow_task], it means you should poll for a new
    /// task, receive a new task token, and complete that new task.
    #[error("Unhandled command when completing workflow activation")]
    UnhandledCommandWhenCompleting,
    /// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
    /// errors, so lang should consider this fatal.
    #[error("Unhandled error when calling the temporal server: {0:?}")]
    TonicError(#[from] tonic::Status),
}

impl From<WorkflowUpdateError> for CompleteWfError {
    fn from(e: WorkflowUpdateError) -> Self {
        Self::WorkflowUpdateError {
            source: e.source,
            run_id: e.run_id,
        }
    }
}

/// Errors thrown by [crate::Core::complete_activity_task]
#[derive(thiserror::Error, Debug)]
pub enum CompleteActivityError {
    /// Lang SDK sent us a malformed activity completion. This likely means a bug in the lang sdk.
    #[error("Lang SDK sent us a malformed activity completion ({reason}): {completion:?}")]
    MalformedActivityCompletion {
        /// Reason the completion was malformed
        reason: String,
        /// The completion, which may not be included to avoid unnecessary copies.
        completion: Option<ActivityResult>,
    },
    /// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
    /// errors, so lang should consider this fatal.
    #[error("Unhandled error when calling the temporal server: {0:?}")]
    TonicError(#[from] tonic::Status),
}

/// Errors thrown by [crate::Core::record_activity_heartbeat] and [crate::Core::get_last_activity_heartbeat]
#[derive(thiserror::Error, Debug)]
pub enum ActivityHeartbeatError {
    #[error("Heartbeat request must contain heartbeat timeout.")]
    HeartbeatTimeoutNotSet,
    #[error("Heartbeat request must contain valid heartbeat timeout.")]
    InvalidHeartbeatTimeout,
    #[error("New heartbeat requests are not accepted while shutting down")]
    ShuttingDown,
    #[error("Unable to dispatch heartbeat.")]
    SendError,
}