Skip to main content

temporalio_common_wasm/
error.rs

1//! Shared error types used across Temporal SDK crates.
2
3use crate::{
4    data_converters::{
5        DecodablePayloads, GenericPayloadConverter, PayloadConversionError, PayloadConverter,
6        RawValue, SerializationContext, SerializationContextData, TemporalDeserializable,
7        TemporalSerializable,
8    },
9    protos::{
10        coresdk::child_workflow::StartChildWorkflowExecutionFailedCause,
11        temporal::api::{
12            common::v1::{Payload, Payloads},
13            enums::v1::{ApplicationErrorCategory as ProtoApplicationErrorCategory, TimeoutType},
14            failure::v1::Failure,
15        },
16    },
17};
18use std::time::Duration;
19
20// We cannot store `Box<dyn TemporalSerializable>` directly here because erased values still need
21// to be driven back through the active `PayloadConverter` to reach serde-based implementations.
22trait SerializableFailurePayload: Send + Sync {
23    fn to_payloads(
24        &self,
25        payload_converter: &PayloadConverter,
26        context: &SerializationContextData,
27    ) -> Result<Vec<Payload>, PayloadConversionError>;
28}
29
30impl<T> SerializableFailurePayload for T
31where
32    T: TemporalSerializable + Send + Sync + 'static,
33{
34    fn to_payloads(
35        &self,
36        payload_converter: &PayloadConverter,
37        context: &SerializationContextData,
38    ) -> Result<Vec<Payload>, PayloadConversionError> {
39        payload_converter.to_payloads(
40            &SerializationContext {
41                data: context,
42                converter: payload_converter,
43            },
44            self,
45        )
46    }
47}
48
49/// Payloads attached to a failure, either as a deferred outbound value or decoded inbound payloads.
50#[derive(derive_more::Debug)]
51pub struct FailurePayloads {
52    repr: FailurePayloadsRepr,
53}
54
55#[derive(derive_more::Debug)]
56enum FailurePayloadsRepr {
57    #[debug("Serializable(...)")]
58    Serializable(#[debug(skip)] Box<dyn SerializableFailurePayload>),
59    Decoded(DecodablePayloads),
60}
61
62impl FailurePayloads {
63    pub(crate) fn encode(
64        &self,
65        payload_converter: &PayloadConverter,
66        context: &SerializationContextData,
67    ) -> Result<Payloads, PayloadConversionError> {
68        let payloads = match &self.repr {
69            FailurePayloadsRepr::Serializable(value) => {
70                value.to_payloads(payload_converter, context)?
71            }
72            FailurePayloadsRepr::Decoded(value) => value.raw().to_vec(),
73        };
74        Ok(Payloads { payloads })
75    }
76
77    /// Deserialize the decoded payloads into a typed value.
78    pub fn deserialize<T: TemporalDeserializable + 'static>(
79        &self,
80    ) -> Result<T, PayloadConversionError> {
81        match &self.repr {
82            FailurePayloadsRepr::Decoded(value) => value.deserialize(),
83            FailurePayloadsRepr::Serializable(_) => Err(PayloadConversionError::WrongEncoding),
84        }
85    }
86
87    /// Returns the decoded raw payloads, if present.
88    pub fn raw(&self) -> Option<&[Payload]> {
89        match &self.repr {
90            FailurePayloadsRepr::Decoded(value) => Some(value.raw()),
91            FailurePayloadsRepr::Serializable(_) => None,
92        }
93    }
94
95    /// Consume this value and return the decoded payloads as a [`RawValue`], if present.
96    pub fn into_raw(self) -> Option<RawValue> {
97        match self.repr {
98            FailurePayloadsRepr::Decoded(value) => Some(value.into_raw()),
99            FailurePayloadsRepr::Serializable(_) => None,
100        }
101    }
102}
103
104impl From<DecodablePayloads> for FailurePayloads {
105    fn from(value: DecodablePayloads) -> Self {
106        Self {
107            repr: FailurePayloadsRepr::Decoded(value),
108        }
109    }
110}
111
112impl<T> From<T> for FailurePayloads
113where
114    T: TemporalSerializable + Send + Sync + 'static,
115{
116    fn from(value: T) -> Self {
117        Self {
118            repr: FailurePayloadsRepr::Serializable(Box::new(value)),
119        }
120    }
121}
122
123/// Categorizes an [`ApplicationFailure`] to hint at how the server and tooling should treat it.
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
125#[non_exhaustive]
126pub enum ApplicationErrorCategory {
127    /// No category was specified.
128    #[default]
129    Unspecified,
130    /// An expected error with little or no severity. Benign errors are logged at a reduced level
131    /// and excluded from error metrics by the server.
132    Benign,
133}
134
135impl From<ApplicationErrorCategory> for ProtoApplicationErrorCategory {
136    fn from(value: ApplicationErrorCategory) -> Self {
137        match value {
138            ApplicationErrorCategory::Unspecified => ProtoApplicationErrorCategory::Unspecified,
139            ApplicationErrorCategory::Benign => ProtoApplicationErrorCategory::Benign,
140        }
141    }
142}
143
144impl From<ProtoApplicationErrorCategory> for ApplicationErrorCategory {
145    fn from(value: ProtoApplicationErrorCategory) -> Self {
146        match value {
147            ProtoApplicationErrorCategory::Unspecified => ApplicationErrorCategory::Unspecified,
148            ProtoApplicationErrorCategory::Benign => ApplicationErrorCategory::Benign,
149        }
150    }
151}
152
153/// User-authored application failure metadata that can be converted into a Temporal failure.
154#[derive(Debug, bon::Builder)]
155#[builder(start_fn = builder, state_mod(vis = "pub"))]
156pub struct ApplicationFailure {
157    #[builder(start_fn, into)]
158    source: Box<dyn std::error::Error + Send + Sync>,
159    type_name: Option<String>,
160    #[builder(default)]
161    non_retryable: bool,
162    next_retry_delay: Option<Duration>,
163    #[builder(default = ApplicationErrorCategory::Unspecified)]
164    category: ApplicationErrorCategory,
165    #[builder(into)]
166    details: Option<FailurePayloads>,
167    failure: Option<Failure>,
168    cause: Option<Box<IncomingError>>,
169}
170
171impl ApplicationFailure {
172    /// Construct a retryable application failure with no extra metadata.
173    pub fn new(source: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
174        Self {
175            source: source.into(),
176            type_name: None,
177            non_retryable: false,
178            next_retry_delay: None,
179            category: ApplicationErrorCategory::Unspecified,
180            details: None,
181            failure: None,
182            cause: None,
183        }
184    }
185
186    /// Construct a non-retryable application failure with no extra metadata.
187    pub fn non_retryable(source: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
188        Self {
189            non_retryable: true,
190            ..Self::new(source)
191        }
192    }
193
194    /// Returns the wrapped source error.
195    pub fn source_error(&self) -> &(dyn std::error::Error + Send + Sync + 'static) {
196        &*self.source as &(dyn std::error::Error + Send + Sync + 'static)
197    }
198
199    /// Returns the configured application failure type name, if any.
200    pub fn type_name(&self) -> Option<&str> {
201        self.type_name.as_deref()
202    }
203
204    /// Returns true if this failure should be treated as non-retryable.
205    pub fn is_non_retryable(&self) -> bool {
206        self.non_retryable
207    }
208
209    /// Returns the explicitly configured next retry delay, if any.
210    pub fn next_retry_delay(&self) -> Option<Duration> {
211        self.next_retry_delay
212    }
213
214    /// Returns the application error category.
215    pub fn category(&self) -> ApplicationErrorCategory {
216        self.category
217    }
218
219    /// Returns the decoded details deserialized as the requested type, if any.
220    pub fn details<T: TemporalDeserializable + 'static>(
221        &self,
222    ) -> Result<Option<T>, PayloadConversionError> {
223        self.details
224            .as_ref()
225            .map(FailurePayloads::deserialize)
226            .transpose()
227    }
228
229    /// Returns the raw decoded details payloads, if any.
230    pub fn raw_details(&self) -> Option<&[Payload]> {
231        self.details.as_ref().and_then(FailurePayloads::raw)
232    }
233
234    pub(crate) fn failure_payloads(&self) -> Option<&FailurePayloads> {
235        self.details.as_ref()
236    }
237
238    /// Returns the original failure proto when this application failure was decoded from one.
239    pub fn failure(&self) -> Option<&Failure> {
240        self.failure.as_ref()
241    }
242
243    /// Consumes this application failure and returns the retained proto failure, if one exists.
244    pub fn into_failure(self) -> Option<Failure> {
245        self.failure
246    }
247
248    /// Returns the normalized cause, if any.
249    pub fn cause(&self) -> Option<&IncomingError> {
250        self.cause.as_deref()
251    }
252
253    /// If this [`ApplicationFailure`] was caused by a timeout, returns the associated
254    /// [`TimeoutError`].
255    pub fn as_timeout(&self) -> Option<&TimeoutError> {
256        self.cause().and_then(IncomingError::as_timeout)
257    }
258
259    /// If this [`ApplicationFailure`] was caused by a cancellation, returns the associated
260    /// [`CancelledError`].
261    pub fn as_cancelled(&self) -> Option<&CancelledError> {
262        self.cause().and_then(IncomingError::as_cancelled)
263    }
264
265    pub(crate) fn from_failure(
266        failure: Failure,
267        cause: Option<IncomingError>,
268        payload_converter: &PayloadConverter,
269        context: &SerializationContextData,
270    ) -> Self {
271        let app_info = failure
272            .maybe_application_failure()
273            .cloned()
274            .unwrap_or_default();
275        let type_name = (!app_info.r#type.is_empty()).then_some(app_info.r#type.clone());
276        Self {
277            source: failure.message.clone().into(),
278            type_name,
279            non_retryable: app_info.non_retryable,
280            next_retry_delay: app_info.next_retry_delay.and_then(|d| d.try_into().ok()),
281            category: app_info.category().into(),
282            details: app_info.details.map(|details| {
283                FailurePayloads::from(DecodablePayloads::new(
284                    details.payloads,
285                    payload_converter.clone(),
286                    *context,
287                ))
288            }),
289            failure: Some(failure),
290            cause: cause.map(Box::new),
291        }
292    }
293}
294
295impl std::fmt::Display for ApplicationFailure {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        write!(f, "{}", self.source)
298    }
299}
300
301impl std::error::Error for ApplicationFailure {
302    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
303        self.cause
304            .as_deref()
305            .map(|cause| cause as &(dyn std::error::Error + 'static))
306            .or_else(|| Some(self.source.as_ref()))
307    }
308}
309
310impl From<anyhow::Error> for ApplicationFailure {
311    fn from(value: anyhow::Error) -> Self {
312        Self::new(value)
313    }
314}
315
316impl From<PayloadConversionError> for ApplicationFailure {
317    fn from(value: PayloadConversionError) -> Self {
318        Self::new(value)
319    }
320}
321
322/// A typed outbound error surface used before encoding to a Temporal failure proto.
323#[derive(Debug, thiserror::Error)]
324pub enum OutgoingError {
325    /// An error produced while completing an activity.
326    #[error(transparent)]
327    Activity(#[from] OutgoingActivityError),
328    /// An error produced from a workflow.
329    #[error(transparent)]
330    Workflow(#[from] OutgoingWorkflowError),
331}
332
333/// A typed outbound activity error.
334#[derive(Debug, thiserror::Error)]
335pub enum OutgoingActivityError {
336    /// An activity application failure.
337    #[error(transparent)]
338    Application(#[from] Box<ApplicationFailure>),
339    /// An activity cancellation with optional details.
340    #[error("Activity cancelled")]
341    Cancelled {
342        /// Optional cancellation details.
343        details: Option<FailurePayloads>,
344    },
345}
346
347/// A typed outbound workflow failure.
348#[derive(Debug, thiserror::Error)]
349pub enum OutgoingWorkflowError {
350    /// A workflow application failure.
351    #[error(transparent)]
352    Application(#[from] Box<ApplicationFailure>),
353    /// A workflow failure sourced from an activity execution.
354    #[error(transparent)]
355    ActivityExecution(#[from] Box<ActivityExecutionError>),
356    /// A workflow failure sourced from a child-workflow execution.
357    #[error(transparent)]
358    ChildWorkflowExecution(#[from] Box<ChildWorkflowExecutionError>),
359    /// A workflow failure sourced from child-workflow start.
360    #[error(transparent)]
361    ChildWorkflowStart(#[from] Box<ChildWorkflowStartError>),
362    /// A workflow failure sourced from signaling a workflow.
363    #[error(transparent)]
364    WorkflowSignal(#[from] Box<WorkflowSignalError>),
365}
366
367impl From<anyhow::Error> for OutgoingWorkflowError {
368    fn from(value: anyhow::Error) -> Self {
369        Self::Application(Box::new(ApplicationFailure::new(value)))
370    }
371}
372
373impl From<PayloadConversionError> for OutgoingWorkflowError {
374    fn from(value: PayloadConversionError) -> Self {
375        Self::Application(Box::new(value.into()))
376    }
377}
378
379impl From<ApplicationFailure> for OutgoingWorkflowError {
380    fn from(value: ApplicationFailure) -> Self {
381        Self::Application(Box::new(value))
382    }
383}
384
385impl From<ActivityExecutionError> for OutgoingWorkflowError {
386    fn from(value: ActivityExecutionError) -> Self {
387        Self::ActivityExecution(Box::new(value))
388    }
389}
390
391impl From<ChildWorkflowExecutionError> for OutgoingWorkflowError {
392    fn from(value: ChildWorkflowExecutionError) -> Self {
393        Self::ChildWorkflowExecution(Box::new(value))
394    }
395}
396
397impl From<ChildWorkflowStartError> for OutgoingWorkflowError {
398    fn from(value: ChildWorkflowStartError) -> Self {
399        Self::ChildWorkflowStart(Box::new(value))
400    }
401}
402
403impl From<WorkflowSignalError> for OutgoingWorkflowError {
404    fn from(value: WorkflowSignalError) -> Self {
405        Self::WorkflowSignal(Box::new(value))
406    }
407}
408
409/// A normalized incoming Temporal failure decoded from a protobuf [`Failure`].
410#[derive(Debug)]
411pub enum IncomingError {
412    /// A decoded application failure.
413    Application(ApplicationFailure),
414    /// A decoded timeout failure.
415    Timeout(TimeoutError),
416    /// A decoded cancellation failure.
417    Cancelled(CancelledError),
418    /// A decoded terminated failure.
419    Terminated(TerminatedError),
420    /// A decoded server failure.
421    Server(ServerError),
422    /// A decoded reset-workflow failure.
423    ResetWorkflow(ResetWorkflowError),
424    /// A decoded activity failure wrapper.
425    Activity(ActivityFailureError),
426    /// A decoded child-workflow failure wrapper.
427    ChildWorkflowExecution(ChildWorkflowFailureError),
428    /// A decoded nexus operation failure wrapper.
429    NexusOperationExecution(IncomingNexusOperationExecutionError),
430    /// A decoded nexus handler failure wrapper.
431    NexusHandler(IncomingNexusHandlerError),
432}
433
434impl IncomingError {
435    /// Returns the original failure proto for this normalized error.
436    pub fn failure(&self) -> &Failure {
437        match self {
438            IncomingError::Application(err) => err
439                .failure()
440                .expect("decoded application failures retain their original proto"),
441            IncomingError::Timeout(err) => err.failure(),
442            IncomingError::Cancelled(err) => err.failure(),
443            IncomingError::Terminated(err) => err.failure(),
444            IncomingError::Server(err) => err.failure(),
445            IncomingError::ResetWorkflow(err) => err.failure(),
446            IncomingError::Activity(err) => err.failure(),
447            IncomingError::ChildWorkflowExecution(err) => err.failure(),
448            IncomingError::NexusOperationExecution(err) => err.failure(),
449            IncomingError::NexusHandler(err) => err.failure(),
450        }
451    }
452
453    /// Returns the normalized cause, if any.
454    pub fn cause(&self) -> Option<&IncomingError> {
455        match self {
456            IncomingError::Application(err) => err.cause(),
457            IncomingError::Timeout(err) => err.cause(),
458            IncomingError::Cancelled(err) => err.cause(),
459            IncomingError::Terminated(err) => err.cause(),
460            IncomingError::Server(err) => err.cause(),
461            IncomingError::ResetWorkflow(err) => err.cause(),
462            IncomingError::Activity(err) => err.cause(),
463            IncomingError::ChildWorkflowExecution(err) => err.cause(),
464            IncomingError::NexusOperationExecution(err) => err.cause(),
465            IncomingError::NexusHandler(err) => err.cause(),
466        }
467    }
468
469    /// Consumes this normalized error and returns the retained proto failure.
470    pub fn into_failure(self) -> Failure {
471        match self {
472            IncomingError::Application(err) => err
473                .into_failure()
474                .expect("decoded application failures retain their original proto"),
475            IncomingError::Timeout(err) => err.into_failure(),
476            IncomingError::Cancelled(err) => err.into_failure(),
477            IncomingError::Terminated(err) => err.into_failure(),
478            IncomingError::Server(err) => err.into_failure(),
479            IncomingError::ResetWorkflow(err) => err.into_failure(),
480            IncomingError::Activity(err) => err.into_failure(),
481            IncomingError::ChildWorkflowExecution(err) => err.into_failure(),
482            IncomingError::NexusOperationExecution(err) => err.into_failure(),
483            IncomingError::NexusHandler(err) => err.into_failure(),
484        }
485    }
486
487    /// If the [`IncomingError`] is a timeout, returns the associated [`TimeoutError`].
488    pub fn as_timeout(&self) -> Option<&TimeoutError> {
489        match self {
490            IncomingError::Timeout(err) => Some(err),
491            _ => None,
492        }
493    }
494
495    /// If the [`IncomingError`] is a cancellation, returns the associated [`CancelledError`].
496    pub fn as_cancelled(&self) -> Option<&CancelledError> {
497        match self {
498            IncomingError::Cancelled(err) => Some(err),
499            _ => None,
500        }
501    }
502}
503
504impl std::fmt::Display for IncomingError {
505    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
506        match self {
507            IncomingError::Application(err) => err.fmt(f),
508            IncomingError::Timeout(err) => err.fmt(f),
509            IncomingError::Cancelled(err) => err.fmt(f),
510            IncomingError::Terminated(err) => err.fmt(f),
511            IncomingError::Server(err) => err.fmt(f),
512            IncomingError::ResetWorkflow(err) => err.fmt(f),
513            IncomingError::Activity(err) => err.fmt(f),
514            IncomingError::ChildWorkflowExecution(err) => err.fmt(f),
515            IncomingError::NexusOperationExecution(err) => err.fmt(f),
516            IncomingError::NexusHandler(err) => err.fmt(f),
517        }
518    }
519}
520
521impl std::error::Error for IncomingError {
522    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
523        match self {
524            IncomingError::Application(err) => Some(err),
525            IncomingError::Timeout(err) => Some(err),
526            IncomingError::Cancelled(err) => Some(err),
527            IncomingError::Terminated(err) => Some(err),
528            IncomingError::Server(err) => Some(err),
529            IncomingError::ResetWorkflow(err) => Some(err),
530            IncomingError::Activity(err) => Some(err),
531            IncomingError::ChildWorkflowExecution(err) => Some(err),
532            IncomingError::NexusOperationExecution(err) => Some(err),
533            IncomingError::NexusHandler(err) => Some(err),
534        }
535    }
536}
537
538macro_rules! impl_incoming_failure_wrapper {
539    ($name:ident) => {
540        impl $name {
541            /// Returns the original failure proto.
542            pub fn failure(&self) -> &Failure {
543                &self.failure
544            }
545
546            /// Returns the normalized cause, if any.
547            pub fn cause(&self) -> Option<&IncomingError> {
548                self.cause.as_deref()
549            }
550
551            /// Consumes this wrapper and returns the retained proto failure.
552            pub fn into_failure(self) -> Failure {
553                self.failure
554            }
555
556            /// Consumes this wrapper and returns the retained proto failure and normalized cause.
557            pub fn into_parts(self) -> (Failure, Option<IncomingError>) {
558                (self.failure, self.cause.map(|cause| *cause))
559            }
560        }
561
562        impl std::fmt::Display for $name {
563            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564                self.failure.fmt(f)
565            }
566        }
567
568        impl std::error::Error for $name {
569            fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
570                self.cause
571                    .as_deref()
572                    .map(|cause| cause as &(dyn std::error::Error + 'static))
573            }
574        }
575    };
576}
577
578macro_rules! incoming_failure_wrapper {
579    ($name:ident, $doc:literal) => {
580        #[doc = $doc]
581        #[derive(Debug)]
582        pub struct $name {
583            failure: Failure,
584            cause: Option<Box<IncomingError>>,
585        }
586
587        impl $name {
588            /// Creates a new normalized incoming error wrapper.
589            pub(crate) fn new(failure: Failure, cause: Option<IncomingError>) -> Self {
590                Self {
591                    failure,
592                    cause: cause.map(Box::new),
593                }
594            }
595        }
596
597        impl_incoming_failure_wrapper!($name);
598    };
599}
600
601/// A normalized timeout failure.
602#[derive(Debug)]
603pub struct TimeoutError {
604    failure: Failure,
605    cause: Option<Box<IncomingError>>,
606    timeout_type: TimeoutType,
607    last_heartbeat_details: Option<DecodablePayloads>,
608}
609
610impl TimeoutError {
611    /// Creates a new normalized timeout error wrapper.
612    pub(crate) fn new(
613        failure: Failure,
614        failure_info: crate::protos::temporal::api::failure::v1::TimeoutFailureInfo,
615        cause: Option<IncomingError>,
616        payload_converter: &PayloadConverter,
617        context: &SerializationContextData,
618    ) -> Self {
619        Self {
620            failure,
621            cause: cause.map(Box::new),
622            timeout_type: failure_info.timeout_type(),
623            last_heartbeat_details: failure_info.last_heartbeat_details.map(|details| {
624                DecodablePayloads::new(details.payloads, payload_converter.clone(), *context)
625            }),
626        }
627    }
628
629    /// Returns the timeout kind described by the failure.
630    pub fn timeout_type(&self) -> TimeoutType {
631        self.timeout_type
632    }
633
634    /// Returns the last heartbeat details carried by the timeout, if any.
635    pub fn last_heartbeat_details<T: TemporalDeserializable + 'static>(
636        &self,
637    ) -> Result<Option<T>, PayloadConversionError> {
638        self.last_heartbeat_details
639            .as_ref()
640            .map(DecodablePayloads::deserialize)
641            .transpose()
642    }
643
644    /// Returns the raw decoded heartbeat details carried by the timeout, if any.
645    pub fn raw_last_heartbeat_details(&self) -> Option<&[Payload]> {
646        self.last_heartbeat_details
647            .as_ref()
648            .map(DecodablePayloads::raw)
649    }
650}
651
652impl_incoming_failure_wrapper!(TimeoutError);
653
654/// A normalized cancellation failure.
655#[derive(Debug)]
656pub struct CancelledError {
657    failure: Failure,
658    cause: Option<Box<IncomingError>>,
659    details: Option<DecodablePayloads>,
660}
661
662impl CancelledError {
663    /// Creates a new normalized cancellation error wrapper.
664    pub(crate) fn new(
665        failure: Failure,
666        failure_info: crate::protos::temporal::api::failure::v1::CanceledFailureInfo,
667        cause: Option<IncomingError>,
668        payload_converter: &PayloadConverter,
669        context: &SerializationContextData,
670    ) -> Self {
671        Self {
672            failure,
673            cause: cause.map(Box::new),
674            details: failure_info.details.map(|details| {
675                DecodablePayloads::new(details.payloads, payload_converter.clone(), *context)
676            }),
677        }
678    }
679
680    /// Returns the cancellation details carried by the failure, deserialized as the requested
681    /// type, if any.
682    pub fn details<T: TemporalDeserializable + 'static>(
683        &self,
684    ) -> Result<Option<T>, PayloadConversionError> {
685        self.details
686            .as_ref()
687            .map(DecodablePayloads::deserialize)
688            .transpose()
689    }
690
691    /// Returns the raw decoded cancellation details carried by the failure, if any.
692    pub fn raw_details(&self) -> Option<&[Payload]> {
693        self.details.as_ref().map(DecodablePayloads::raw)
694    }
695}
696
697impl_incoming_failure_wrapper!(CancelledError);
698incoming_failure_wrapper!(TerminatedError, "A normalized terminated failure.");
699incoming_failure_wrapper!(ServerError, "A normalized server failure.");
700incoming_failure_wrapper!(ResetWorkflowError, "A normalized reset-workflow failure.");
701
702/// A normalized activity failure wrapper.
703#[derive(Debug)]
704pub struct ActivityFailureError {
705    failure: Failure,
706    cause: Option<Box<IncomingError>>,
707    activity_id: String,
708    activity_type: Option<crate::protos::temporal::api::common::v1::ActivityType>,
709    scheduled_event_id: i64,
710    started_event_id: i64,
711    identity: String,
712    retry_state: crate::protos::temporal::api::enums::v1::RetryState,
713}
714
715impl ActivityFailureError {
716    /// Creates a new normalized activity failure wrapper.
717    pub(crate) fn new(
718        failure: Failure,
719        failure_info: crate::protos::temporal::api::failure::v1::ActivityFailureInfo,
720        cause: Option<IncomingError>,
721    ) -> Self {
722        let retry_state = failure_info.retry_state();
723        Self {
724            failure,
725            cause: cause.map(Box::new),
726            activity_id: failure_info.activity_id,
727            activity_type: failure_info.activity_type,
728            scheduled_event_id: failure_info.scheduled_event_id,
729            started_event_id: failure_info.started_event_id,
730            identity: failure_info.identity,
731            retry_state,
732        }
733    }
734
735    /// Returns the activity id reported by the failure.
736    pub fn activity_id(&self) -> &str {
737        &self.activity_id
738    }
739
740    /// Returns the activity type, if present.
741    pub fn activity_type(&self) -> Option<&crate::protos::temporal::api::common::v1::ActivityType> {
742        self.activity_type.as_ref()
743    }
744
745    /// Returns the scheduled event id.
746    pub fn scheduled_event_id(&self) -> i64 {
747        self.scheduled_event_id
748    }
749
750    /// Returns the started event id.
751    pub fn started_event_id(&self) -> i64 {
752        self.started_event_id
753    }
754
755    /// Returns the worker identity captured on the failure.
756    pub fn identity(&self) -> &str {
757        &self.identity
758    }
759
760    /// Returns the retry state reported by core.
761    pub fn retry_state(&self) -> crate::protos::temporal::api::enums::v1::RetryState {
762        self.retry_state
763    }
764
765    /// If this [`ActivityFailureError`] was caused by a timeout, returns the associated
766    /// [`TimeoutError`].
767    pub fn as_timeout(&self) -> Option<&TimeoutError> {
768        self.cause().and_then(IncomingError::as_timeout)
769    }
770
771    /// If this [`ActivityFailureError`] was caused by a cancellation, returns the associated
772    /// [`CancelledError`].
773    pub fn as_cancelled(&self) -> Option<&CancelledError> {
774        self.cause().and_then(IncomingError::as_cancelled)
775    }
776}
777
778impl_incoming_failure_wrapper!(ActivityFailureError);
779/// A normalized child-workflow execution failure wrapper.
780#[derive(Debug)]
781pub struct ChildWorkflowFailureError {
782    failure: Failure,
783    cause: Option<Box<IncomingError>>,
784    namespace: String,
785    workflow_execution: Option<crate::protos::temporal::api::common::v1::WorkflowExecution>,
786    workflow_type: Option<crate::protos::temporal::api::common::v1::WorkflowType>,
787    initiated_event_id: i64,
788    started_event_id: i64,
789    retry_state: crate::protos::temporal::api::enums::v1::RetryState,
790}
791
792impl ChildWorkflowFailureError {
793    /// Creates a new normalized child-workflow execution failure wrapper.
794    pub(crate) fn new(
795        failure: Failure,
796        failure_info: crate::protos::temporal::api::failure::v1::ChildWorkflowExecutionFailureInfo,
797        cause: Option<IncomingError>,
798    ) -> Self {
799        let retry_state = failure_info.retry_state();
800        Self {
801            failure,
802            cause: cause.map(Box::new),
803            namespace: failure_info.namespace,
804            workflow_execution: failure_info.workflow_execution,
805            workflow_type: failure_info.workflow_type,
806            initiated_event_id: failure_info.initiated_event_id,
807            started_event_id: failure_info.started_event_id,
808            retry_state,
809        }
810    }
811
812    /// Returns the namespace of the child workflow.
813    pub fn namespace(&self) -> &str {
814        &self.namespace
815    }
816
817    /// Returns the child workflow execution, if present.
818    pub fn workflow_execution(
819        &self,
820    ) -> Option<&crate::protos::temporal::api::common::v1::WorkflowExecution> {
821        self.workflow_execution.as_ref()
822    }
823
824    /// Returns the child workflow type, if present.
825    pub fn workflow_type(&self) -> Option<&crate::protos::temporal::api::common::v1::WorkflowType> {
826        self.workflow_type.as_ref()
827    }
828
829    /// Returns the initiated event id.
830    pub fn initiated_event_id(&self) -> i64 {
831        self.initiated_event_id
832    }
833
834    /// Returns the started event id.
835    pub fn started_event_id(&self) -> i64 {
836        self.started_event_id
837    }
838
839    /// Returns the retry state reported by core.
840    pub fn retry_state(&self) -> crate::protos::temporal::api::enums::v1::RetryState {
841        self.retry_state
842    }
843
844    /// If this [`ChildWorkflowFailureError`] was caused by a timeout, returns the associated
845    /// [`TimeoutError`].
846    pub fn as_timeout(&self) -> Option<&TimeoutError> {
847        self.cause().and_then(IncomingError::as_timeout)
848    }
849
850    /// If this [`ChildWorkflowFailureError`] was caused by a cancellation, returns the associated
851    /// [`CancelledError`].
852    pub fn as_cancelled(&self) -> Option<&CancelledError> {
853        self.cause().and_then(IncomingError::as_cancelled)
854    }
855}
856
857impl_incoming_failure_wrapper!(ChildWorkflowFailureError);
858incoming_failure_wrapper!(
859    IncomingNexusOperationExecutionError,
860    "A normalized nexus operation failure wrapper."
861);
862incoming_failure_wrapper!(
863    IncomingNexusHandlerError,
864    "A normalized nexus handler failure wrapper."
865);
866
867/// Error type for activity execution outcomes.
868#[derive(Debug, thiserror::Error)]
869pub enum ActivityExecutionError {
870    /// The activity failed with the given failure details.
871    #[error("Activity failed: {}", .0.failure().message)]
872    Failed(#[source] ActivityFailureError),
873    /// The activity was cancelled.
874    #[error("Activity cancelled: {}", .0.failure().message)]
875    Cancelled(#[source] CancelledError),
876    /// Failed to serialize input or deserialize result payload.
877    #[error("Payload conversion failed: {0}")]
878    Serialization(#[from] PayloadConversionError),
879}
880
881impl ActivityExecutionError {
882    /// Returns the retained top-level activity failure proto, if one exists.
883    pub fn failure(&self) -> Option<&Failure> {
884        match self {
885            ActivityExecutionError::Failed(err) => Some(err.failure()),
886            ActivityExecutionError::Cancelled(err) => Some(err.failure()),
887            ActivityExecutionError::Serialization(_) => None,
888        }
889    }
890
891    /// Returns the normalized cause of the top-level activity failure, if any.
892    pub fn cause(&self) -> Option<&IncomingError> {
893        match self {
894            ActivityExecutionError::Failed(err) => err.cause(),
895            ActivityExecutionError::Cancelled(err) => err.cause(),
896            ActivityExecutionError::Serialization(_) => None,
897        }
898    }
899
900    /// Returns the underlying failure reason for wrapper-shaped activity failures.
901    pub fn reason(&self) -> Option<&IncomingError> {
902        match self {
903            ActivityExecutionError::Failed(err) => err.cause(),
904            ActivityExecutionError::Cancelled(_) | ActivityExecutionError::Serialization(_) => None,
905        }
906    }
907
908    /// If this [`ActivityExecutionError`] was caused by a timeout, returns the associated
909    /// [`TimeoutError`].
910    pub fn as_timeout(&self) -> Option<&TimeoutError> {
911        match self {
912            ActivityExecutionError::Failed(err) => err.as_timeout(),
913            ActivityExecutionError::Serialization(_) | ActivityExecutionError::Cancelled(_) => None,
914        }
915    }
916
917    /// If this [`ActivityExecutionError`] was caused by a cancellation, returns the associated
918    /// [`CancelledError`].
919    pub fn as_cancelled(&self) -> Option<&CancelledError> {
920        match self {
921            ActivityExecutionError::Failed(err) => err.as_cancelled(),
922            ActivityExecutionError::Cancelled(err) => Some(err),
923            ActivityExecutionError::Serialization(_) => None,
924        }
925    }
926}
927
928/// Error returned when starting a child workflow fails.
929#[derive(Debug, thiserror::Error)]
930pub enum ChildWorkflowStartError {
931    /// The child workflow start was cancelled before the normal execution wrapper path existed.
932    #[error("Child workflow start cancelled: {}", .0.failure().message)]
933    Cancelled(#[source] Box<CancelledError>),
934    /// The child workflow failed to start (e.g., workflow ID already exists).
935    #[error(
936        "Child workflow start failed: workflow_id={workflow_id}, workflow_type={workflow_type}, cause={cause:?}"
937    )]
938    StartFailed {
939        /// The workflow ID that was requested.
940        workflow_id: String,
941        /// The workflow type that was requested.
942        workflow_type: String,
943        /// The cause of the start failure.
944        cause: StartChildWorkflowExecutionFailedCause,
945    },
946    /// Failed to serialize child workflow input payloads.
947    #[error("Payload conversion failed: {0}")]
948    Serialization(#[from] PayloadConversionError),
949}
950
951impl ChildWorkflowStartError {
952    /// Returns the retained top-level failure proto, if one exists.
953    pub fn failure(&self) -> Option<&Failure> {
954        match self {
955            ChildWorkflowStartError::Cancelled(err) => Some(err.failure()),
956            ChildWorkflowStartError::StartFailed { .. }
957            | ChildWorkflowStartError::Serialization(_) => None,
958        }
959    }
960
961    /// Returns the normalized cause of the retained failure proto, if any.
962    pub fn cause(&self) -> Option<&IncomingError> {
963        match self {
964            ChildWorkflowStartError::Cancelled(err) => err.cause(),
965            ChildWorkflowStartError::StartFailed { .. }
966            | ChildWorkflowStartError::Serialization(_) => None,
967        }
968    }
969}
970
971/// Error returned when a child workflow execution fails.
972#[derive(Debug, thiserror::Error)]
973pub enum ChildWorkflowExecutionError {
974    /// The child workflow failed.
975    #[error("Child workflow failed: {}", .0.failure().message)]
976    Failed(#[source] Box<ChildWorkflowFailureError>),
977    /// Failed to serialize input or deserialize the child workflow result payload.
978    #[error("Payload conversion failed: {0}")]
979    Serialization(#[from] PayloadConversionError),
980}
981
982impl ChildWorkflowExecutionError {
983    /// Returns the retained top-level child-workflow failure proto, if one exists.
984    pub fn failure(&self) -> Option<&Failure> {
985        match self {
986            ChildWorkflowExecutionError::Failed(err) => Some(err.failure()),
987            ChildWorkflowExecutionError::Serialization(_) => None,
988        }
989    }
990
991    /// Returns the normalized cause of the top-level child-workflow failure, if any.
992    pub fn cause(&self) -> Option<&IncomingError> {
993        match self {
994            ChildWorkflowExecutionError::Failed(err) => err.cause(),
995            ChildWorkflowExecutionError::Serialization(_) => None,
996        }
997    }
998
999    /// Returns the underlying failure reason for wrapper-shaped child-workflow failures.
1000    pub fn reason(&self) -> Option<&IncomingError> {
1001        match self {
1002            ChildWorkflowExecutionError::Failed(err) => err.cause(),
1003            ChildWorkflowExecutionError::Serialization(_) => None,
1004        }
1005    }
1006
1007    /// If this [`ChildWorkflowExecutionError`] was caused by a timeout, returns the associated
1008    /// [`TimeoutError`].
1009    pub fn as_timeout(&self) -> Option<&TimeoutError> {
1010        match self {
1011            ChildWorkflowExecutionError::Failed(err) => err.as_timeout(),
1012            ChildWorkflowExecutionError::Serialization(_) => None,
1013        }
1014    }
1015
1016    /// If this [`ChildWorkflowExecutionError`] was caused by a cancellation, returns the associated
1017    /// [`CancelledError`].
1018    pub fn as_cancelled(&self) -> Option<&CancelledError> {
1019        match self {
1020            ChildWorkflowExecutionError::Failed(err) => err.as_cancelled(),
1021            ChildWorkflowExecutionError::Serialization(_) => None,
1022        }
1023    }
1024}
1025
1026/// Error returned when signaling a workflow fails.
1027#[derive(Debug, thiserror::Error)]
1028pub enum WorkflowSignalError {
1029    /// The signal delivery failed.
1030    #[error("Child workflow signal failed: {}", .0.failure().message)]
1031    Failed(#[source] Box<WorkflowSignalFailureError>),
1032    /// Failed to serialize the signal input payload.
1033    #[error("Signal payload conversion failed: {0}")]
1034    Serialization(#[from] PayloadConversionError),
1035}
1036
1037impl WorkflowSignalError {
1038    /// Returns the retained top-level workflow signal failure proto, if one exists.
1039    pub fn failure(&self) -> Option<&Failure> {
1040        match self {
1041            WorkflowSignalError::Failed(err) => Some(err.failure()),
1042            WorkflowSignalError::Serialization(_) => None,
1043        }
1044    }
1045
1046    /// Returns the normalized cause of the workflow signal failure, if any.
1047    pub fn cause(&self) -> Option<&IncomingError> {
1048        match self {
1049            WorkflowSignalError::Failed(err) => err.cause(),
1050            WorkflowSignalError::Serialization(_) => None,
1051        }
1052    }
1053
1054    /// Returns the underlying failure reason for wrapper-shaped signal failures.
1055    pub fn reason(&self) -> Option<&IncomingError> {
1056        match self {
1057            WorkflowSignalError::Failed(err) => Some(err.error()),
1058            WorkflowSignalError::Serialization(_) => None,
1059        }
1060    }
1061}
1062
1063/// A normalized workflow signal failure wrapper.
1064#[derive(Debug)]
1065pub struct WorkflowSignalFailureError {
1066    failure: Failure,
1067    error: Box<IncomingError>,
1068}
1069
1070impl WorkflowSignalFailureError {
1071    /// Creates a workflow signal failure wrapper.
1072    pub(crate) fn new(failure: Failure, error: IncomingError) -> Self {
1073        Self {
1074            failure,
1075            error: Box::new(error),
1076        }
1077    }
1078
1079    /// Returns the retained top-level proto failure.
1080    pub fn failure(&self) -> &Failure {
1081        &self.failure
1082    }
1083
1084    /// Returns the normalized direct cause of the workflow signal failure, if any.
1085    pub fn cause(&self) -> Option<&IncomingError> {
1086        self.error.cause()
1087    }
1088
1089    /// Returns the direct decoded incoming error represented by the top-level proto failure.
1090    pub fn error(&self) -> &IncomingError {
1091        &self.error
1092    }
1093}
1094
1095impl std::fmt::Display for WorkflowSignalFailureError {
1096    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1097        self.failure.fmt(f)
1098    }
1099}
1100
1101impl std::error::Error for WorkflowSignalFailureError {
1102    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1103        self.cause()
1104            .map(|cause| cause as &(dyn std::error::Error + 'static))
1105    }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110    use super::*;
1111    use crate::{
1112        data_converters::{
1113            DefaultFailureConverter, FailureConverter, GenericPayloadConverter, PayloadConverter,
1114            SerializationContext, SerializationContextData,
1115        },
1116        protos::temporal::api::{common::v1::Payload, failure::v1::failure::FailureInfo},
1117    };
1118
1119    struct AlwaysFailsSerialize;
1120
1121    impl serde::Serialize for AlwaysFailsSerialize {
1122        fn serialize<S: serde::Serializer>(&self, _serializer: S) -> Result<S::Ok, S::Error> {
1123            Err(serde::ser::Error::custom("serialize boom"))
1124        }
1125    }
1126
1127    #[test]
1128    fn constructors_set_retryability_defaults() {
1129        assert!(!ApplicationFailure::new(anyhow::anyhow!("retryable")).is_non_retryable());
1130        assert!(
1131            ApplicationFailure::non_retryable(anyhow::anyhow!("non-retryable")).is_non_retryable()
1132        );
1133    }
1134
1135    #[test]
1136    fn conversion_preserves_application_metadata() {
1137        let payloads = Payloads {
1138            payloads: vec![Payload {
1139                data: b"details".to_vec(),
1140                ..Default::default()
1141            }],
1142        };
1143        let failure = DefaultFailureConverter.to_failure(
1144            OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1145                ApplicationFailure::builder(anyhow::anyhow!("oops"))
1146                    .type_name("MyType".to_owned())
1147                    .non_retryable(true)
1148                    .next_retry_delay(Duration::from_secs(3))
1149                    .category(ApplicationErrorCategory::Benign)
1150                    .details(RawValue::new(payloads.payloads.clone()))
1151                    .build(),
1152            ))),
1153            &PayloadConverter::default(),
1154            &SerializationContextData::None,
1155        );
1156        let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1157            panic!("expected application failure info");
1158        };
1159        assert_eq!(failure.message, "oops");
1160        assert_eq!(info.r#type, "MyType");
1161        assert!(info.non_retryable);
1162        assert_eq!(info.details, Some(payloads));
1163        assert_eq!(info.category(), ProtoApplicationErrorCategory::Benign);
1164        assert_eq!(info.next_retry_delay.unwrap().seconds, 3);
1165    }
1166
1167    #[test]
1168    fn builder_accepts_raw_payload_details() {
1169        let payload = Payload {
1170            data: b"details".to_vec(),
1171            ..Default::default()
1172        };
1173        let failure = DefaultFailureConverter.to_failure(
1174            OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1175                ApplicationFailure::builder(anyhow::anyhow!("oops"))
1176                    .details(RawValue::new(vec![payload.clone()]))
1177                    .build(),
1178            ))),
1179            &PayloadConverter::default(),
1180            &SerializationContextData::None,
1181        );
1182
1183        let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1184            panic!("expected application failure info");
1185        };
1186        assert_eq!(info.details.unwrap().payloads, vec![payload]);
1187    }
1188
1189    #[test]
1190    fn builder_accepts_serializable_details() {
1191        let failure = DefaultFailureConverter.to_failure(
1192            OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1193                ApplicationFailure::builder(anyhow::anyhow!("oops"))
1194                    .details("details".to_string())
1195                    .build(),
1196            ))),
1197            &PayloadConverter::default(),
1198            &SerializationContextData::None,
1199        );
1200
1201        let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1202            panic!("expected application failure info");
1203        };
1204        let payloads = info.details.expect("expected details").payloads;
1205        let converter = PayloadConverter::default();
1206        let details: String = converter
1207            .from_payloads(
1208                &SerializationContext {
1209                    data: &SerializationContextData::None,
1210                    converter: &converter,
1211                },
1212                payloads,
1213            )
1214            .unwrap();
1215        assert_eq!(details, "details");
1216    }
1217
1218    #[test]
1219    fn application_failure_encoding_surfaces_detail_encoding_errors() {
1220        let failure = DefaultFailureConverter.to_failure(
1221            OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1222                ApplicationFailure::builder(anyhow::anyhow!("oops"))
1223                    .details(AlwaysFailsSerialize)
1224                    .build(),
1225            ))),
1226            &PayloadConverter::default(),
1227            &SerializationContextData::None,
1228        );
1229
1230        assert_eq!(
1231            failure.message,
1232            "Failed converting error to failure: Encoding error: serialize boom, original error message: oops"
1233        );
1234        assert!(matches!(
1235            failure.failure_info,
1236            Some(FailureInfo::ApplicationFailureInfo(_))
1237        ));
1238    }
1239
1240    #[test]
1241    fn anyhow_workflow_errors_default_to_application_outgoing_errors() {
1242        let outgoing: OutgoingWorkflowError = anyhow::anyhow!("workflow boom").into();
1243
1244        let OutgoingWorkflowError::Application(app) = outgoing else {
1245            panic!("plain workflow errors should default to application failures");
1246        };
1247        assert_eq!(app.to_string(), "workflow boom");
1248    }
1249
1250    #[test]
1251    fn payload_conversion_errors_default_to_application_outgoing_errors() {
1252        let outgoing: OutgoingWorkflowError =
1253            PayloadConversionError::EncodingError(anyhow::anyhow!("encode boom").into()).into();
1254
1255        let OutgoingWorkflowError::Application(app) = outgoing else {
1256            panic!("payload conversion errors should default to application failures");
1257        };
1258        assert_eq!(app.to_string(), "Encoding error: encode boom");
1259    }
1260}