Skip to main content

temporalio_common/
error.rs

1//! Shared error types used across Temporal SDK crates.
2
3use crate::{
4    data_converters::{
5        DecodablePayloads, GenericPayloadConverter, PayloadConversionError, PayloadConverter,
6        RawValue, SerializationContext, SerializationContextData, TemporalDeserializable,
7        TemporalSerializable,
8    },
9    protos::{
10        coresdk::child_workflow::StartChildWorkflowExecutionFailedCause,
11        temporal::api::{
12            common::v1::{Payload, Payloads},
13            enums::v1::{ApplicationErrorCategory, TimeoutType},
14            failure::v1::Failure,
15        },
16    },
17};
18use std::time::Duration;
19
20// We cannot store `Box<dyn TemporalSerializable>` directly here because erased values still need
21// to be driven back through the active `PayloadConverter` to reach serde-based implementations.
22trait SerializableFailurePayload: Send + Sync {
23    fn to_payloads(
24        &self,
25        payload_converter: &PayloadConverter,
26        context: &SerializationContextData,
27    ) -> Result<Vec<Payload>, PayloadConversionError>;
28}
29
30impl<T> SerializableFailurePayload for T
31where
32    T: TemporalSerializable + Send + Sync + 'static,
33{
34    fn to_payloads(
35        &self,
36        payload_converter: &PayloadConverter,
37        context: &SerializationContextData,
38    ) -> Result<Vec<Payload>, PayloadConversionError> {
39        payload_converter.to_payloads(
40            &SerializationContext {
41                data: context,
42                converter: payload_converter,
43            },
44            self,
45        )
46    }
47}
48
49/// Payloads attached to a failure, either as a deferred outbound value or decoded inbound payloads.
50#[derive(derive_more::Debug)]
51pub struct FailurePayloads {
52    repr: FailurePayloadsRepr,
53}
54
55#[derive(derive_more::Debug)]
56enum FailurePayloadsRepr {
57    #[debug("Serializable(...)")]
58    Serializable(#[debug(skip)] Box<dyn SerializableFailurePayload>),
59    Decoded(DecodablePayloads),
60}
61
62impl FailurePayloads {
63    pub(crate) fn encode(
64        &self,
65        payload_converter: &PayloadConverter,
66        context: &SerializationContextData,
67    ) -> Result<Payloads, PayloadConversionError> {
68        let payloads = match &self.repr {
69            FailurePayloadsRepr::Serializable(value) => {
70                value.to_payloads(payload_converter, context)?
71            }
72            FailurePayloadsRepr::Decoded(value) => value.raw().to_vec(),
73        };
74        Ok(Payloads { payloads })
75    }
76
77    /// Deserialize the decoded payloads into a typed value.
78    pub fn deserialize<T: TemporalDeserializable + 'static>(
79        &self,
80    ) -> Result<T, PayloadConversionError> {
81        match &self.repr {
82            FailurePayloadsRepr::Decoded(value) => value.deserialize(),
83            FailurePayloadsRepr::Serializable(_) => Err(PayloadConversionError::WrongEncoding),
84        }
85    }
86
87    /// Returns the decoded raw payloads, if present.
88    pub fn raw(&self) -> Option<&[Payload]> {
89        match &self.repr {
90            FailurePayloadsRepr::Decoded(value) => Some(value.raw()),
91            FailurePayloadsRepr::Serializable(_) => None,
92        }
93    }
94
95    /// Consume this value and return the decoded payloads as a [`RawValue`], if present.
96    pub fn into_raw(self) -> Option<RawValue> {
97        match self.repr {
98            FailurePayloadsRepr::Decoded(value) => Some(value.into_raw()),
99            FailurePayloadsRepr::Serializable(_) => None,
100        }
101    }
102}
103
104impl From<DecodablePayloads> for FailurePayloads {
105    fn from(value: DecodablePayloads) -> Self {
106        Self {
107            repr: FailurePayloadsRepr::Decoded(value),
108        }
109    }
110}
111
112impl<T> From<T> for FailurePayloads
113where
114    T: TemporalSerializable + Send + Sync + 'static,
115{
116    fn from(value: T) -> Self {
117        Self {
118            repr: FailurePayloadsRepr::Serializable(Box::new(value)),
119        }
120    }
121}
122
123/// User-authored application failure metadata that can be converted into a Temporal failure.
124#[derive(Debug, bon::Builder)]
125#[builder(start_fn = builder, state_mod(vis = "pub"))]
126pub struct ApplicationFailure {
127    #[builder(start_fn, into)]
128    source: anyhow::Error,
129    type_name: Option<String>,
130    #[builder(default)]
131    non_retryable: bool,
132    next_retry_delay: Option<Duration>,
133    #[builder(default = ApplicationErrorCategory::Unspecified)]
134    category: ApplicationErrorCategory,
135    #[builder(into)]
136    details: Option<FailurePayloads>,
137    failure: Option<Failure>,
138    cause: Option<Box<IncomingError>>,
139}
140
141impl ApplicationFailure {
142    /// Construct a retryable application failure with no extra metadata.
143    pub fn new(source: impl Into<anyhow::Error>) -> Self {
144        Self {
145            source: source.into(),
146            type_name: None,
147            non_retryable: false,
148            next_retry_delay: None,
149            category: ApplicationErrorCategory::Unspecified,
150            details: None,
151            failure: None,
152            cause: None,
153        }
154    }
155
156    /// Construct a non-retryable application failure with no extra metadata.
157    pub fn non_retryable(source: impl Into<anyhow::Error>) -> Self {
158        Self {
159            non_retryable: true,
160            ..Self::new(source)
161        }
162    }
163
164    /// Returns the wrapped source error.
165    pub fn source_error(&self) -> &anyhow::Error {
166        &self.source
167    }
168
169    /// Returns the configured application failure type name, if any.
170    pub fn type_name(&self) -> Option<&str> {
171        self.type_name.as_deref()
172    }
173
174    /// Returns true if this failure should be treated as non-retryable.
175    pub fn is_non_retryable(&self) -> bool {
176        self.non_retryable
177    }
178
179    /// Returns the explicitly configured next retry delay, if any.
180    pub fn next_retry_delay(&self) -> Option<Duration> {
181        self.next_retry_delay
182    }
183
184    /// Returns the application error category.
185    pub fn category(&self) -> ApplicationErrorCategory {
186        self.category
187    }
188
189    /// Returns the decoded details deserialized as the requested type, if any.
190    pub fn details<T: TemporalDeserializable + 'static>(
191        &self,
192    ) -> Result<Option<T>, PayloadConversionError> {
193        self.details
194            .as_ref()
195            .map(FailurePayloads::deserialize)
196            .transpose()
197    }
198
199    /// Returns the raw decoded details payloads, if any.
200    pub fn raw_details(&self) -> Option<&[Payload]> {
201        self.details.as_ref().and_then(FailurePayloads::raw)
202    }
203
204    pub(crate) fn failure_payloads(&self) -> Option<&FailurePayloads> {
205        self.details.as_ref()
206    }
207
208    /// Returns the original failure proto when this application failure was decoded from one.
209    pub fn failure(&self) -> Option<&Failure> {
210        self.failure.as_ref()
211    }
212
213    /// Consumes this application failure and returns the retained proto failure, if one exists.
214    pub fn into_failure(self) -> Option<Failure> {
215        self.failure
216    }
217
218    /// Returns the normalized cause, if any.
219    pub fn cause(&self) -> Option<&IncomingError> {
220        self.cause.as_deref()
221    }
222
223    /// If this [`ApplicationFailure`] was caused by a timeout, returns the associated
224    /// [`TimeoutError`].
225    pub fn as_timeout(&self) -> Option<&TimeoutError> {
226        self.cause().and_then(IncomingError::as_timeout)
227    }
228
229    /// If this [`ApplicationFailure`] was caused by a cancellation, returns the associated
230    /// [`CancelledError`].
231    pub fn as_cancelled(&self) -> Option<&CancelledError> {
232        self.cause().and_then(IncomingError::as_cancelled)
233    }
234
235    pub(crate) fn from_failure(
236        failure: Failure,
237        cause: Option<IncomingError>,
238        payload_converter: &PayloadConverter,
239        context: &SerializationContextData,
240    ) -> Self {
241        let app_info = failure
242            .maybe_application_failure()
243            .cloned()
244            .unwrap_or_default();
245        let type_name = (!app_info.r#type.is_empty()).then_some(app_info.r#type.clone());
246        Self {
247            source: anyhow::anyhow!(failure.message.clone()),
248            type_name,
249            non_retryable: app_info.non_retryable,
250            next_retry_delay: app_info.next_retry_delay.and_then(|d| d.try_into().ok()),
251            category: app_info.category(),
252            details: app_info.details.map(|details| {
253                FailurePayloads::from(DecodablePayloads::new(
254                    details.payloads,
255                    payload_converter.clone(),
256                    *context,
257                ))
258            }),
259            failure: Some(failure),
260            cause: cause.map(Box::new),
261        }
262    }
263}
264
265impl std::fmt::Display for ApplicationFailure {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        write!(f, "{}", self.source)
268    }
269}
270
271impl std::error::Error for ApplicationFailure {
272    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
273        self.cause
274            .as_deref()
275            .map(|cause| cause as &(dyn std::error::Error + 'static))
276            .or_else(|| Some(self.source.as_ref()))
277    }
278}
279
280impl From<anyhow::Error> for ApplicationFailure {
281    fn from(value: anyhow::Error) -> Self {
282        Self::new(value)
283    }
284}
285
286impl From<PayloadConversionError> for ApplicationFailure {
287    fn from(value: PayloadConversionError) -> Self {
288        Self::new(value)
289    }
290}
291
292/// A typed outbound error surface used before encoding to a Temporal failure proto.
293#[derive(Debug, thiserror::Error)]
294pub enum OutgoingError {
295    /// An error produced while completing an activity.
296    #[error(transparent)]
297    Activity(#[from] OutgoingActivityError),
298    /// An error produced from a workflow.
299    #[error(transparent)]
300    Workflow(#[from] OutgoingWorkflowError),
301}
302
303/// A typed outbound activity error.
304#[derive(Debug, thiserror::Error)]
305pub enum OutgoingActivityError {
306    /// An activity application failure.
307    #[error(transparent)]
308    Application(#[from] Box<ApplicationFailure>),
309    /// An activity cancellation with optional details.
310    #[error("Activity cancelled")]
311    Cancelled {
312        /// Optional cancellation details.
313        details: Option<FailurePayloads>,
314    },
315}
316
317/// A typed outbound workflow failure.
318#[derive(Debug, thiserror::Error)]
319pub enum OutgoingWorkflowError {
320    /// A workflow application failure.
321    #[error(transparent)]
322    Application(#[from] Box<ApplicationFailure>),
323    /// A workflow failure sourced from an activity execution.
324    #[error(transparent)]
325    ActivityExecution(#[from] Box<ActivityExecutionError>),
326    /// A workflow failure sourced from a child-workflow execution.
327    #[error(transparent)]
328    ChildWorkflowExecution(#[from] Box<ChildWorkflowExecutionError>),
329    /// A workflow failure sourced from child-workflow start.
330    #[error(transparent)]
331    ChildWorkflowStart(#[from] Box<ChildWorkflowStartError>),
332    /// A workflow failure sourced from child-workflow signaling.
333    #[error(transparent)]
334    ChildWorkflowSignal(#[from] Box<ChildWorkflowSignalError>),
335}
336
337impl From<anyhow::Error> for OutgoingWorkflowError {
338    fn from(value: anyhow::Error) -> Self {
339        Self::Application(Box::new(ApplicationFailure::new(value)))
340    }
341}
342
343impl From<PayloadConversionError> for OutgoingWorkflowError {
344    fn from(value: PayloadConversionError) -> Self {
345        Self::Application(Box::new(value.into()))
346    }
347}
348
349impl From<ApplicationFailure> for OutgoingWorkflowError {
350    fn from(value: ApplicationFailure) -> Self {
351        Self::Application(Box::new(value))
352    }
353}
354
355impl From<ActivityExecutionError> for OutgoingWorkflowError {
356    fn from(value: ActivityExecutionError) -> Self {
357        Self::ActivityExecution(Box::new(value))
358    }
359}
360
361impl From<ChildWorkflowExecutionError> for OutgoingWorkflowError {
362    fn from(value: ChildWorkflowExecutionError) -> Self {
363        Self::ChildWorkflowExecution(Box::new(value))
364    }
365}
366
367impl From<ChildWorkflowStartError> for OutgoingWorkflowError {
368    fn from(value: ChildWorkflowStartError) -> Self {
369        Self::ChildWorkflowStart(Box::new(value))
370    }
371}
372
373impl From<ChildWorkflowSignalError> for OutgoingWorkflowError {
374    fn from(value: ChildWorkflowSignalError) -> Self {
375        Self::ChildWorkflowSignal(Box::new(value))
376    }
377}
378
379/// A normalized incoming Temporal failure decoded from a protobuf [`Failure`].
380#[derive(Debug)]
381pub enum IncomingError {
382    /// A decoded application failure.
383    Application(ApplicationFailure),
384    /// A decoded timeout failure.
385    Timeout(TimeoutError),
386    /// A decoded cancellation failure.
387    Cancelled(CancelledError),
388    /// A decoded terminated failure.
389    Terminated(TerminatedError),
390    /// A decoded server failure.
391    Server(ServerError),
392    /// A decoded reset-workflow failure.
393    ResetWorkflow(ResetWorkflowError),
394    /// A decoded activity failure wrapper.
395    Activity(ActivityFailureError),
396    /// A decoded child-workflow failure wrapper.
397    ChildWorkflowExecution(ChildWorkflowFailureError),
398    /// A decoded nexus operation failure wrapper.
399    NexusOperationExecution(IncomingNexusOperationExecutionError),
400    /// A decoded nexus handler failure wrapper.
401    NexusHandler(IncomingNexusHandlerError),
402}
403
404impl IncomingError {
405    /// Returns the original failure proto for this normalized error.
406    pub fn failure(&self) -> &Failure {
407        match self {
408            IncomingError::Application(err) => err
409                .failure()
410                .expect("decoded application failures retain their original proto"),
411            IncomingError::Timeout(err) => err.failure(),
412            IncomingError::Cancelled(err) => err.failure(),
413            IncomingError::Terminated(err) => err.failure(),
414            IncomingError::Server(err) => err.failure(),
415            IncomingError::ResetWorkflow(err) => err.failure(),
416            IncomingError::Activity(err) => err.failure(),
417            IncomingError::ChildWorkflowExecution(err) => err.failure(),
418            IncomingError::NexusOperationExecution(err) => err.failure(),
419            IncomingError::NexusHandler(err) => err.failure(),
420        }
421    }
422
423    /// Returns the normalized cause, if any.
424    pub fn cause(&self) -> Option<&IncomingError> {
425        match self {
426            IncomingError::Application(err) => err.cause(),
427            IncomingError::Timeout(err) => err.cause(),
428            IncomingError::Cancelled(err) => err.cause(),
429            IncomingError::Terminated(err) => err.cause(),
430            IncomingError::Server(err) => err.cause(),
431            IncomingError::ResetWorkflow(err) => err.cause(),
432            IncomingError::Activity(err) => err.cause(),
433            IncomingError::ChildWorkflowExecution(err) => err.cause(),
434            IncomingError::NexusOperationExecution(err) => err.cause(),
435            IncomingError::NexusHandler(err) => err.cause(),
436        }
437    }
438
439    /// Consumes this normalized error and returns the retained proto failure.
440    pub fn into_failure(self) -> Failure {
441        match self {
442            IncomingError::Application(err) => err
443                .into_failure()
444                .expect("decoded application failures retain their original proto"),
445            IncomingError::Timeout(err) => err.into_failure(),
446            IncomingError::Cancelled(err) => err.into_failure(),
447            IncomingError::Terminated(err) => err.into_failure(),
448            IncomingError::Server(err) => err.into_failure(),
449            IncomingError::ResetWorkflow(err) => err.into_failure(),
450            IncomingError::Activity(err) => err.into_failure(),
451            IncomingError::ChildWorkflowExecution(err) => err.into_failure(),
452            IncomingError::NexusOperationExecution(err) => err.into_failure(),
453            IncomingError::NexusHandler(err) => err.into_failure(),
454        }
455    }
456
457    /// If the [`IncomingError`] is a timeout, returns the associated [`TimeoutError`].
458    pub fn as_timeout(&self) -> Option<&TimeoutError> {
459        match self {
460            IncomingError::Timeout(err) => Some(err),
461            _ => None,
462        }
463    }
464
465    /// If the [`IncomingError`] is a cancellation, returns the associated [`CancelledError`].
466    pub fn as_cancelled(&self) -> Option<&CancelledError> {
467        match self {
468            IncomingError::Cancelled(err) => Some(err),
469            _ => None,
470        }
471    }
472}
473
474impl std::fmt::Display for IncomingError {
475    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476        match self {
477            IncomingError::Application(err) => err.fmt(f),
478            IncomingError::Timeout(err) => err.fmt(f),
479            IncomingError::Cancelled(err) => err.fmt(f),
480            IncomingError::Terminated(err) => err.fmt(f),
481            IncomingError::Server(err) => err.fmt(f),
482            IncomingError::ResetWorkflow(err) => err.fmt(f),
483            IncomingError::Activity(err) => err.fmt(f),
484            IncomingError::ChildWorkflowExecution(err) => err.fmt(f),
485            IncomingError::NexusOperationExecution(err) => err.fmt(f),
486            IncomingError::NexusHandler(err) => err.fmt(f),
487        }
488    }
489}
490
491impl std::error::Error for IncomingError {
492    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
493        match self {
494            IncomingError::Application(err) => Some(err),
495            IncomingError::Timeout(err) => Some(err),
496            IncomingError::Cancelled(err) => Some(err),
497            IncomingError::Terminated(err) => Some(err),
498            IncomingError::Server(err) => Some(err),
499            IncomingError::ResetWorkflow(err) => Some(err),
500            IncomingError::Activity(err) => Some(err),
501            IncomingError::ChildWorkflowExecution(err) => Some(err),
502            IncomingError::NexusOperationExecution(err) => Some(err),
503            IncomingError::NexusHandler(err) => Some(err),
504        }
505    }
506}
507
508macro_rules! impl_incoming_failure_wrapper {
509    ($name:ident) => {
510        impl $name {
511            /// Returns the original failure proto.
512            pub fn failure(&self) -> &Failure {
513                &self.failure
514            }
515
516            /// Returns the normalized cause, if any.
517            pub fn cause(&self) -> Option<&IncomingError> {
518                self.cause.as_deref()
519            }
520
521            /// Consumes this wrapper and returns the retained proto failure.
522            pub fn into_failure(self) -> Failure {
523                self.failure
524            }
525
526            /// Consumes this wrapper and returns the retained proto failure and normalized cause.
527            pub fn into_parts(self) -> (Failure, Option<IncomingError>) {
528                (self.failure, self.cause.map(|cause| *cause))
529            }
530        }
531
532        impl std::fmt::Display for $name {
533            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534                self.failure.fmt(f)
535            }
536        }
537
538        impl std::error::Error for $name {
539            fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
540                self.cause
541                    .as_deref()
542                    .map(|cause| cause as &(dyn std::error::Error + 'static))
543            }
544        }
545    };
546}
547
548macro_rules! incoming_failure_wrapper {
549    ($name:ident, $doc:literal) => {
550        #[doc = $doc]
551        #[derive(Debug)]
552        pub struct $name {
553            failure: Failure,
554            cause: Option<Box<IncomingError>>,
555        }
556
557        impl $name {
558            /// Creates a new normalized incoming error wrapper.
559            pub(crate) fn new(failure: Failure, cause: Option<IncomingError>) -> Self {
560                Self {
561                    failure,
562                    cause: cause.map(Box::new),
563                }
564            }
565        }
566
567        impl_incoming_failure_wrapper!($name);
568    };
569}
570
571/// A normalized timeout failure.
572#[derive(Debug)]
573pub struct TimeoutError {
574    failure: Failure,
575    cause: Option<Box<IncomingError>>,
576    timeout_type: TimeoutType,
577    last_heartbeat_details: Option<DecodablePayloads>,
578}
579
580impl TimeoutError {
581    /// Creates a new normalized timeout error wrapper.
582    pub(crate) fn new(
583        failure: Failure,
584        failure_info: crate::protos::temporal::api::failure::v1::TimeoutFailureInfo,
585        cause: Option<IncomingError>,
586        payload_converter: &PayloadConverter,
587        context: &SerializationContextData,
588    ) -> Self {
589        Self {
590            failure,
591            cause: cause.map(Box::new),
592            timeout_type: failure_info.timeout_type(),
593            last_heartbeat_details: failure_info.last_heartbeat_details.map(|details| {
594                DecodablePayloads::new(details.payloads, payload_converter.clone(), *context)
595            }),
596        }
597    }
598
599    /// Returns the timeout kind described by the failure.
600    pub fn timeout_type(&self) -> TimeoutType {
601        self.timeout_type
602    }
603
604    /// Returns the last heartbeat details carried by the timeout, if any.
605    pub fn last_heartbeat_details<T: TemporalDeserializable + 'static>(
606        &self,
607    ) -> Result<Option<T>, PayloadConversionError> {
608        self.last_heartbeat_details
609            .as_ref()
610            .map(DecodablePayloads::deserialize)
611            .transpose()
612    }
613
614    /// Returns the raw decoded heartbeat details carried by the timeout, if any.
615    pub fn raw_last_heartbeat_details(&self) -> Option<&[Payload]> {
616        self.last_heartbeat_details
617            .as_ref()
618            .map(DecodablePayloads::raw)
619    }
620}
621
622impl_incoming_failure_wrapper!(TimeoutError);
623
624/// A normalized cancellation failure.
625#[derive(Debug)]
626pub struct CancelledError {
627    failure: Failure,
628    cause: Option<Box<IncomingError>>,
629    details: Option<DecodablePayloads>,
630}
631
632impl CancelledError {
633    /// Creates a new normalized cancellation error wrapper.
634    pub(crate) fn new(
635        failure: Failure,
636        failure_info: crate::protos::temporal::api::failure::v1::CanceledFailureInfo,
637        cause: Option<IncomingError>,
638        payload_converter: &PayloadConverter,
639        context: &SerializationContextData,
640    ) -> Self {
641        Self {
642            failure,
643            cause: cause.map(Box::new),
644            details: failure_info.details.map(|details| {
645                DecodablePayloads::new(details.payloads, payload_converter.clone(), *context)
646            }),
647        }
648    }
649
650    /// Returns the cancellation details carried by the failure, deserialized as the requested
651    /// type, if any.
652    pub fn details<T: TemporalDeserializable + 'static>(
653        &self,
654    ) -> Result<Option<T>, PayloadConversionError> {
655        self.details
656            .as_ref()
657            .map(DecodablePayloads::deserialize)
658            .transpose()
659    }
660
661    /// Returns the raw decoded cancellation details carried by the failure, if any.
662    pub fn raw_details(&self) -> Option<&[Payload]> {
663        self.details.as_ref().map(DecodablePayloads::raw)
664    }
665}
666
667impl_incoming_failure_wrapper!(CancelledError);
668incoming_failure_wrapper!(TerminatedError, "A normalized terminated failure.");
669incoming_failure_wrapper!(ServerError, "A normalized server failure.");
670incoming_failure_wrapper!(ResetWorkflowError, "A normalized reset-workflow failure.");
671
672/// A normalized activity failure wrapper.
673#[derive(Debug)]
674pub struct ActivityFailureError {
675    failure: Failure,
676    cause: Option<Box<IncomingError>>,
677    activity_id: String,
678    activity_type: Option<crate::protos::temporal::api::common::v1::ActivityType>,
679    scheduled_event_id: i64,
680    started_event_id: i64,
681    identity: String,
682    retry_state: crate::protos::temporal::api::enums::v1::RetryState,
683}
684
685impl ActivityFailureError {
686    /// Creates a new normalized activity failure wrapper.
687    pub(crate) fn new(
688        failure: Failure,
689        failure_info: crate::protos::temporal::api::failure::v1::ActivityFailureInfo,
690        cause: Option<IncomingError>,
691    ) -> Self {
692        let retry_state = failure_info.retry_state();
693        Self {
694            failure,
695            cause: cause.map(Box::new),
696            activity_id: failure_info.activity_id,
697            activity_type: failure_info.activity_type,
698            scheduled_event_id: failure_info.scheduled_event_id,
699            started_event_id: failure_info.started_event_id,
700            identity: failure_info.identity,
701            retry_state,
702        }
703    }
704
705    /// Returns the activity id reported by the failure.
706    pub fn activity_id(&self) -> &str {
707        &self.activity_id
708    }
709
710    /// Returns the activity type, if present.
711    pub fn activity_type(&self) -> Option<&crate::protos::temporal::api::common::v1::ActivityType> {
712        self.activity_type.as_ref()
713    }
714
715    /// Returns the scheduled event id.
716    pub fn scheduled_event_id(&self) -> i64 {
717        self.scheduled_event_id
718    }
719
720    /// Returns the started event id.
721    pub fn started_event_id(&self) -> i64 {
722        self.started_event_id
723    }
724
725    /// Returns the worker identity captured on the failure.
726    pub fn identity(&self) -> &str {
727        &self.identity
728    }
729
730    /// Returns the retry state reported by core.
731    pub fn retry_state(&self) -> crate::protos::temporal::api::enums::v1::RetryState {
732        self.retry_state
733    }
734
735    /// If this [`ActivityFailureError`] was caused by a timeout, returns the associated
736    /// [`TimeoutError`].
737    pub fn as_timeout(&self) -> Option<&TimeoutError> {
738        self.cause().and_then(IncomingError::as_timeout)
739    }
740
741    /// If this [`ActivityFailureError`] was caused by a cancellation, returns the associated
742    /// [`CancelledError`].
743    pub fn as_cancelled(&self) -> Option<&CancelledError> {
744        self.cause().and_then(IncomingError::as_cancelled)
745    }
746}
747
748impl_incoming_failure_wrapper!(ActivityFailureError);
749/// A normalized child-workflow execution failure wrapper.
750#[derive(Debug)]
751pub struct ChildWorkflowFailureError {
752    failure: Failure,
753    cause: Option<Box<IncomingError>>,
754    namespace: String,
755    workflow_execution: Option<crate::protos::temporal::api::common::v1::WorkflowExecution>,
756    workflow_type: Option<crate::protos::temporal::api::common::v1::WorkflowType>,
757    initiated_event_id: i64,
758    started_event_id: i64,
759    retry_state: crate::protos::temporal::api::enums::v1::RetryState,
760}
761
762impl ChildWorkflowFailureError {
763    /// Creates a new normalized child-workflow execution failure wrapper.
764    pub(crate) fn new(
765        failure: Failure,
766        failure_info: crate::protos::temporal::api::failure::v1::ChildWorkflowExecutionFailureInfo,
767        cause: Option<IncomingError>,
768    ) -> Self {
769        let retry_state = failure_info.retry_state();
770        Self {
771            failure,
772            cause: cause.map(Box::new),
773            namespace: failure_info.namespace,
774            workflow_execution: failure_info.workflow_execution,
775            workflow_type: failure_info.workflow_type,
776            initiated_event_id: failure_info.initiated_event_id,
777            started_event_id: failure_info.started_event_id,
778            retry_state,
779        }
780    }
781
782    /// Returns the namespace of the child workflow.
783    pub fn namespace(&self) -> &str {
784        &self.namespace
785    }
786
787    /// Returns the child workflow execution, if present.
788    pub fn workflow_execution(
789        &self,
790    ) -> Option<&crate::protos::temporal::api::common::v1::WorkflowExecution> {
791        self.workflow_execution.as_ref()
792    }
793
794    /// Returns the child workflow type, if present.
795    pub fn workflow_type(&self) -> Option<&crate::protos::temporal::api::common::v1::WorkflowType> {
796        self.workflow_type.as_ref()
797    }
798
799    /// Returns the initiated event id.
800    pub fn initiated_event_id(&self) -> i64 {
801        self.initiated_event_id
802    }
803
804    /// Returns the started event id.
805    pub fn started_event_id(&self) -> i64 {
806        self.started_event_id
807    }
808
809    /// Returns the retry state reported by core.
810    pub fn retry_state(&self) -> crate::protos::temporal::api::enums::v1::RetryState {
811        self.retry_state
812    }
813
814    /// If this [`ChildWorkflowFailureError`] was caused by a timeout, returns the associated
815    /// [`TimeoutError`].
816    pub fn as_timeout(&self) -> Option<&TimeoutError> {
817        self.cause().and_then(IncomingError::as_timeout)
818    }
819
820    /// If this [`ChildWorkflowFailureError`] was caused by a cancellation, returns the associated
821    /// [`CancelledError`].
822    pub fn as_cancelled(&self) -> Option<&CancelledError> {
823        self.cause().and_then(IncomingError::as_cancelled)
824    }
825}
826
827impl_incoming_failure_wrapper!(ChildWorkflowFailureError);
828incoming_failure_wrapper!(
829    IncomingNexusOperationExecutionError,
830    "A normalized nexus operation failure wrapper."
831);
832incoming_failure_wrapper!(
833    IncomingNexusHandlerError,
834    "A normalized nexus handler failure wrapper."
835);
836
837/// Error type for activity execution outcomes.
838#[derive(Debug, thiserror::Error)]
839pub enum ActivityExecutionError {
840    /// The activity failed with the given failure details.
841    #[error("Activity failed: {}", .0.failure().message)]
842    Failed(#[source] ActivityFailureError),
843    /// The activity was cancelled.
844    #[error("Activity cancelled: {}", .0.failure().message)]
845    Cancelled(#[source] CancelledError),
846    /// Failed to serialize input or deserialize result payload.
847    #[error("Payload conversion failed: {0}")]
848    Serialization(#[from] PayloadConversionError),
849}
850
851impl ActivityExecutionError {
852    /// Returns the retained top-level activity failure proto, if one exists.
853    pub fn failure(&self) -> Option<&Failure> {
854        match self {
855            ActivityExecutionError::Failed(err) => Some(err.failure()),
856            ActivityExecutionError::Cancelled(err) => Some(err.failure()),
857            ActivityExecutionError::Serialization(_) => None,
858        }
859    }
860
861    /// Returns the normalized cause of the top-level activity failure, if any.
862    pub fn cause(&self) -> Option<&IncomingError> {
863        match self {
864            ActivityExecutionError::Failed(err) => err.cause(),
865            ActivityExecutionError::Cancelled(err) => err.cause(),
866            ActivityExecutionError::Serialization(_) => None,
867        }
868    }
869
870    /// Returns the underlying failure reason for wrapper-shaped activity failures.
871    pub fn reason(&self) -> Option<&IncomingError> {
872        match self {
873            ActivityExecutionError::Failed(err) => err.cause(),
874            ActivityExecutionError::Cancelled(_) | ActivityExecutionError::Serialization(_) => None,
875        }
876    }
877
878    /// If this [`ActivityExecutionError`] was caused by a timeout, returns the associated
879    /// [`TimeoutError`].
880    pub fn as_timeout(&self) -> Option<&TimeoutError> {
881        match self {
882            ActivityExecutionError::Failed(err) => err.as_timeout(),
883            ActivityExecutionError::Serialization(_) | ActivityExecutionError::Cancelled(_) => None,
884        }
885    }
886
887    /// If this [`ActivityExecutionError`] was caused by a cancellation, returns the associated
888    /// [`CancelledError`].
889    pub fn as_cancelled(&self) -> Option<&CancelledError> {
890        match self {
891            ActivityExecutionError::Failed(err) => err.as_cancelled(),
892            ActivityExecutionError::Cancelled(err) => Some(err),
893            ActivityExecutionError::Serialization(_) => None,
894        }
895    }
896}
897
898/// Error returned when starting a child workflow fails.
899#[derive(Debug, thiserror::Error)]
900pub enum ChildWorkflowStartError {
901    /// The child workflow start was cancelled before the normal execution wrapper path existed.
902    #[error("Child workflow start cancelled: {}", .0.failure().message)]
903    Cancelled(#[source] Box<CancelledError>),
904    /// The child workflow failed to start (e.g., workflow ID already exists).
905    #[error(
906        "Child workflow start failed: workflow_id={workflow_id}, workflow_type={workflow_type}, cause={cause:?}"
907    )]
908    StartFailed {
909        /// The workflow ID that was requested.
910        workflow_id: String,
911        /// The workflow type that was requested.
912        workflow_type: String,
913        /// The cause of the start failure.
914        cause: StartChildWorkflowExecutionFailedCause,
915    },
916    /// Failed to serialize child workflow input payloads.
917    #[error("Payload conversion failed: {0}")]
918    Serialization(#[from] PayloadConversionError),
919}
920
921impl ChildWorkflowStartError {
922    /// Returns the retained top-level failure proto, if one exists.
923    pub fn failure(&self) -> Option<&Failure> {
924        match self {
925            ChildWorkflowStartError::Cancelled(err) => Some(err.failure()),
926            ChildWorkflowStartError::StartFailed { .. }
927            | ChildWorkflowStartError::Serialization(_) => None,
928        }
929    }
930
931    /// Returns the normalized cause of the retained failure proto, if any.
932    pub fn cause(&self) -> Option<&IncomingError> {
933        match self {
934            ChildWorkflowStartError::Cancelled(err) => err.cause(),
935            ChildWorkflowStartError::StartFailed { .. }
936            | ChildWorkflowStartError::Serialization(_) => None,
937        }
938    }
939}
940
941/// Error returned when a child workflow execution fails.
942#[derive(Debug, thiserror::Error)]
943pub enum ChildWorkflowExecutionError {
944    /// The child workflow failed.
945    #[error("Child workflow failed: {}", .0.failure().message)]
946    Failed(#[source] Box<ChildWorkflowFailureError>),
947    /// Failed to serialize input or deserialize the child workflow result payload.
948    #[error("Payload conversion failed: {0}")]
949    Serialization(#[from] PayloadConversionError),
950}
951
952impl ChildWorkflowExecutionError {
953    /// Returns the retained top-level child-workflow failure proto, if one exists.
954    pub fn failure(&self) -> Option<&Failure> {
955        match self {
956            ChildWorkflowExecutionError::Failed(err) => Some(err.failure()),
957            ChildWorkflowExecutionError::Serialization(_) => None,
958        }
959    }
960
961    /// Returns the normalized cause of the top-level child-workflow failure, if any.
962    pub fn cause(&self) -> Option<&IncomingError> {
963        match self {
964            ChildWorkflowExecutionError::Failed(err) => err.cause(),
965            ChildWorkflowExecutionError::Serialization(_) => None,
966        }
967    }
968
969    /// Returns the underlying failure reason for wrapper-shaped child-workflow failures.
970    pub fn reason(&self) -> Option<&IncomingError> {
971        match self {
972            ChildWorkflowExecutionError::Failed(err) => err.cause(),
973            ChildWorkflowExecutionError::Serialization(_) => None,
974        }
975    }
976
977    /// If this [`ChildWorkflowExecutionError`] was caused by a timeout, returns the associated
978    /// [`TimeoutError`].
979    pub fn as_timeout(&self) -> Option<&TimeoutError> {
980        match self {
981            ChildWorkflowExecutionError::Failed(err) => err.as_timeout(),
982            ChildWorkflowExecutionError::Serialization(_) => None,
983        }
984    }
985
986    /// If this [`ChildWorkflowExecutionError`] was caused by a cancellation, returns the associated
987    /// [`CancelledError`].
988    pub fn as_cancelled(&self) -> Option<&CancelledError> {
989        match self {
990            ChildWorkflowExecutionError::Failed(err) => err.as_cancelled(),
991            ChildWorkflowExecutionError::Serialization(_) => None,
992        }
993    }
994}
995
996/// Error returned when signaling a child workflow fails.
997#[derive(Debug, thiserror::Error)]
998pub enum ChildWorkflowSignalError {
999    /// The signal delivery failed.
1000    #[error("Child workflow signal failed: {}", .0.failure().message)]
1001    Failed(#[source] Box<ChildWorkflowSignalFailureError>),
1002    /// Failed to serialize the signal input payload.
1003    #[error("Signal payload conversion failed: {0}")]
1004    Serialization(#[from] PayloadConversionError),
1005}
1006
1007impl ChildWorkflowSignalError {
1008    /// Returns the retained top-level child-workflow signal failure proto, if one exists.
1009    pub fn failure(&self) -> Option<&Failure> {
1010        match self {
1011            ChildWorkflowSignalError::Failed(err) => Some(err.failure()),
1012            ChildWorkflowSignalError::Serialization(_) => None,
1013        }
1014    }
1015
1016    /// Returns the normalized cause of the child-workflow signal failure, if any.
1017    pub fn cause(&self) -> Option<&IncomingError> {
1018        match self {
1019            ChildWorkflowSignalError::Failed(err) => err.cause(),
1020            ChildWorkflowSignalError::Serialization(_) => None,
1021        }
1022    }
1023
1024    /// Returns the underlying failure reason for wrapper-shaped signal failures.
1025    pub fn reason(&self) -> Option<&IncomingError> {
1026        match self {
1027            ChildWorkflowSignalError::Failed(err) => Some(err.error()),
1028            ChildWorkflowSignalError::Serialization(_) => None,
1029        }
1030    }
1031}
1032
1033/// A normalized child-workflow signal failure wrapper.
1034#[derive(Debug)]
1035pub struct ChildWorkflowSignalFailureError {
1036    failure: Failure,
1037    error: Box<IncomingError>,
1038}
1039
1040impl ChildWorkflowSignalFailureError {
1041    /// Creates a child-workflow signal failure wrapper.
1042    pub(crate) fn new(failure: Failure, error: IncomingError) -> Self {
1043        Self {
1044            failure,
1045            error: Box::new(error),
1046        }
1047    }
1048
1049    /// Returns the retained top-level proto failure.
1050    pub fn failure(&self) -> &Failure {
1051        &self.failure
1052    }
1053
1054    /// Returns the normalized direct cause of the child-workflow signal failure, if any.
1055    pub fn cause(&self) -> Option<&IncomingError> {
1056        self.error.cause()
1057    }
1058
1059    /// Returns the direct decoded incoming error represented by the top-level proto failure.
1060    pub fn error(&self) -> &IncomingError {
1061        &self.error
1062    }
1063}
1064
1065impl std::fmt::Display for ChildWorkflowSignalFailureError {
1066    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1067        self.failure.fmt(f)
1068    }
1069}
1070
1071impl std::error::Error for ChildWorkflowSignalFailureError {
1072    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1073        self.cause()
1074            .map(|cause| cause as &(dyn std::error::Error + 'static))
1075    }
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080    use super::*;
1081    use crate::{
1082        data_converters::{
1083            DefaultFailureConverter, FailureConverter, GenericPayloadConverter, PayloadConverter,
1084            SerializationContext, SerializationContextData,
1085        },
1086        protos::temporal::api::{common::v1::Payload, failure::v1::failure::FailureInfo},
1087    };
1088
1089    struct AlwaysFailsSerialize;
1090
1091    impl serde::Serialize for AlwaysFailsSerialize {
1092        fn serialize<S: serde::Serializer>(&self, _serializer: S) -> Result<S::Ok, S::Error> {
1093            Err(serde::ser::Error::custom("serialize boom"))
1094        }
1095    }
1096
1097    #[test]
1098    fn constructors_set_retryability_defaults() {
1099        assert!(!ApplicationFailure::new(anyhow::anyhow!("retryable")).is_non_retryable());
1100        assert!(
1101            ApplicationFailure::non_retryable(anyhow::anyhow!("non-retryable")).is_non_retryable()
1102        );
1103    }
1104
1105    #[test]
1106    fn conversion_preserves_application_metadata() {
1107        let payloads = Payloads {
1108            payloads: vec![Payload {
1109                data: b"details".to_vec(),
1110                ..Default::default()
1111            }],
1112        };
1113        let failure = DefaultFailureConverter.to_failure(
1114            OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1115                ApplicationFailure::builder(anyhow::anyhow!("oops"))
1116                    .type_name("MyType".to_owned())
1117                    .non_retryable(true)
1118                    .next_retry_delay(Duration::from_secs(3))
1119                    .category(ApplicationErrorCategory::Benign)
1120                    .details(RawValue::new(payloads.payloads.clone()))
1121                    .build(),
1122            ))),
1123            &PayloadConverter::default(),
1124            &SerializationContextData::None,
1125        );
1126        let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1127            panic!("expected application failure info");
1128        };
1129        assert_eq!(failure.message, "oops");
1130        assert_eq!(info.r#type, "MyType");
1131        assert!(info.non_retryable);
1132        assert_eq!(info.details, Some(payloads));
1133        assert_eq!(info.category(), ApplicationErrorCategory::Benign);
1134        assert_eq!(info.next_retry_delay.unwrap().seconds, 3);
1135    }
1136
1137    #[test]
1138    fn builder_accepts_raw_payload_details() {
1139        let payload = Payload {
1140            data: b"details".to_vec(),
1141            ..Default::default()
1142        };
1143        let failure = DefaultFailureConverter.to_failure(
1144            OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1145                ApplicationFailure::builder(anyhow::anyhow!("oops"))
1146                    .details(RawValue::new(vec![payload.clone()]))
1147                    .build(),
1148            ))),
1149            &PayloadConverter::default(),
1150            &SerializationContextData::None,
1151        );
1152
1153        let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1154            panic!("expected application failure info");
1155        };
1156        assert_eq!(info.details.unwrap().payloads, vec![payload]);
1157    }
1158
1159    #[test]
1160    fn builder_accepts_serializable_details() {
1161        let failure = DefaultFailureConverter.to_failure(
1162            OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1163                ApplicationFailure::builder(anyhow::anyhow!("oops"))
1164                    .details("details".to_string())
1165                    .build(),
1166            ))),
1167            &PayloadConverter::default(),
1168            &SerializationContextData::None,
1169        );
1170
1171        let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1172            panic!("expected application failure info");
1173        };
1174        let payloads = info.details.expect("expected details").payloads;
1175        let converter = PayloadConverter::default();
1176        let details: String = converter
1177            .from_payloads(
1178                &SerializationContext {
1179                    data: &SerializationContextData::None,
1180                    converter: &converter,
1181                },
1182                payloads,
1183            )
1184            .unwrap();
1185        assert_eq!(details, "details");
1186    }
1187
1188    #[test]
1189    fn application_failure_encoding_surfaces_detail_encoding_errors() {
1190        let failure = DefaultFailureConverter.to_failure(
1191            OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1192                ApplicationFailure::builder(anyhow::anyhow!("oops"))
1193                    .details(AlwaysFailsSerialize)
1194                    .build(),
1195            ))),
1196            &PayloadConverter::default(),
1197            &SerializationContextData::None,
1198        );
1199
1200        assert_eq!(
1201            failure.message,
1202            "Failed converting error to failure: Encoding error: serialize boom, original error message: oops"
1203        );
1204        assert!(matches!(
1205            failure.failure_info,
1206            Some(FailureInfo::ApplicationFailureInfo(_))
1207        ));
1208    }
1209
1210    #[test]
1211    fn anyhow_workflow_errors_default_to_application_outgoing_errors() {
1212        let outgoing: OutgoingWorkflowError = anyhow::anyhow!("workflow boom").into();
1213
1214        let OutgoingWorkflowError::Application(app) = outgoing else {
1215            panic!("plain workflow errors should default to application failures");
1216        };
1217        assert_eq!(app.to_string(), "workflow boom");
1218    }
1219
1220    #[test]
1221    fn payload_conversion_errors_default_to_application_outgoing_errors() {
1222        let outgoing: OutgoingWorkflowError =
1223            PayloadConversionError::EncodingError(anyhow::anyhow!("encode boom").into()).into();
1224
1225        let OutgoingWorkflowError::Application(app) = outgoing else {
1226            panic!("payload conversion errors should default to application failures");
1227        };
1228        assert_eq!(app.to_string(), "Encoding error: encode boom");
1229    }
1230}