1use crate::{
4 data_converters::{
5 DecodablePayloads, GenericPayloadConverter, PayloadConversionError, PayloadConverter,
6 RawValue, SerializationContext, SerializationContextData, TemporalDeserializable,
7 TemporalSerializable,
8 },
9 protos::{
10 coresdk::child_workflow::StartChildWorkflowExecutionFailedCause,
11 temporal::api::{
12 common::v1::{Payload, Payloads},
13 enums::v1::{ApplicationErrorCategory as ProtoApplicationErrorCategory, TimeoutType},
14 failure::v1::Failure,
15 },
16 },
17};
18use std::time::Duration;
19
20trait SerializableFailurePayload: Send + Sync {
23 fn to_payloads(
24 &self,
25 payload_converter: &PayloadConverter,
26 context: &SerializationContextData,
27 ) -> Result<Vec<Payload>, PayloadConversionError>;
28}
29
30impl<T> SerializableFailurePayload for T
31where
32 T: TemporalSerializable + Send + Sync + 'static,
33{
34 fn to_payloads(
35 &self,
36 payload_converter: &PayloadConverter,
37 context: &SerializationContextData,
38 ) -> Result<Vec<Payload>, PayloadConversionError> {
39 payload_converter.to_payloads(
40 &SerializationContext {
41 data: context,
42 converter: payload_converter,
43 },
44 self,
45 )
46 }
47}
48
49#[derive(derive_more::Debug)]
51pub struct FailurePayloads {
52 repr: FailurePayloadsRepr,
53}
54
55#[derive(derive_more::Debug)]
56enum FailurePayloadsRepr {
57 #[debug("Serializable(...)")]
58 Serializable(#[debug(skip)] Box<dyn SerializableFailurePayload>),
59 Decoded(DecodablePayloads),
60}
61
62impl FailurePayloads {
63 pub(crate) fn encode(
64 &self,
65 payload_converter: &PayloadConverter,
66 context: &SerializationContextData,
67 ) -> Result<Payloads, PayloadConversionError> {
68 let payloads = match &self.repr {
69 FailurePayloadsRepr::Serializable(value) => {
70 value.to_payloads(payload_converter, context)?
71 }
72 FailurePayloadsRepr::Decoded(value) => value.raw().to_vec(),
73 };
74 Ok(Payloads { payloads })
75 }
76
77 pub fn deserialize<T: TemporalDeserializable + 'static>(
79 &self,
80 ) -> Result<T, PayloadConversionError> {
81 match &self.repr {
82 FailurePayloadsRepr::Decoded(value) => value.deserialize(),
83 FailurePayloadsRepr::Serializable(_) => Err(PayloadConversionError::WrongEncoding),
84 }
85 }
86
87 pub fn raw(&self) -> Option<&[Payload]> {
89 match &self.repr {
90 FailurePayloadsRepr::Decoded(value) => Some(value.raw()),
91 FailurePayloadsRepr::Serializable(_) => None,
92 }
93 }
94
95 pub fn into_raw(self) -> Option<RawValue> {
97 match self.repr {
98 FailurePayloadsRepr::Decoded(value) => Some(value.into_raw()),
99 FailurePayloadsRepr::Serializable(_) => None,
100 }
101 }
102}
103
104impl From<DecodablePayloads> for FailurePayloads {
105 fn from(value: DecodablePayloads) -> Self {
106 Self {
107 repr: FailurePayloadsRepr::Decoded(value),
108 }
109 }
110}
111
112impl<T> From<T> for FailurePayloads
113where
114 T: TemporalSerializable + Send + Sync + 'static,
115{
116 fn from(value: T) -> Self {
117 Self {
118 repr: FailurePayloadsRepr::Serializable(Box::new(value)),
119 }
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
125#[non_exhaustive]
126pub enum ApplicationErrorCategory {
127 #[default]
129 Unspecified,
130 Benign,
133}
134
135impl From<ApplicationErrorCategory> for ProtoApplicationErrorCategory {
136 fn from(value: ApplicationErrorCategory) -> Self {
137 match value {
138 ApplicationErrorCategory::Unspecified => ProtoApplicationErrorCategory::Unspecified,
139 ApplicationErrorCategory::Benign => ProtoApplicationErrorCategory::Benign,
140 }
141 }
142}
143
144impl From<ProtoApplicationErrorCategory> for ApplicationErrorCategory {
145 fn from(value: ProtoApplicationErrorCategory) -> Self {
146 match value {
147 ProtoApplicationErrorCategory::Unspecified => ApplicationErrorCategory::Unspecified,
148 ProtoApplicationErrorCategory::Benign => ApplicationErrorCategory::Benign,
149 }
150 }
151}
152
153#[derive(Debug, bon::Builder)]
155#[builder(start_fn = builder, state_mod(vis = "pub"))]
156pub struct ApplicationFailure {
157 #[builder(start_fn, into)]
158 source: Box<dyn std::error::Error + Send + Sync>,
159 type_name: Option<String>,
160 #[builder(default)]
161 non_retryable: bool,
162 next_retry_delay: Option<Duration>,
163 #[builder(default = ApplicationErrorCategory::Unspecified)]
164 category: ApplicationErrorCategory,
165 #[builder(into)]
166 details: Option<FailurePayloads>,
167 failure: Option<Failure>,
168 cause: Option<Box<IncomingError>>,
169}
170
171impl ApplicationFailure {
172 pub fn new(source: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
174 Self {
175 source: source.into(),
176 type_name: None,
177 non_retryable: false,
178 next_retry_delay: None,
179 category: ApplicationErrorCategory::Unspecified,
180 details: None,
181 failure: None,
182 cause: None,
183 }
184 }
185
186 pub fn non_retryable(source: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
188 Self {
189 non_retryable: true,
190 ..Self::new(source)
191 }
192 }
193
194 pub fn source_error(&self) -> &(dyn std::error::Error + Send + Sync + 'static) {
196 &*self.source as &(dyn std::error::Error + Send + Sync + 'static)
197 }
198
199 pub fn type_name(&self) -> Option<&str> {
201 self.type_name.as_deref()
202 }
203
204 pub fn is_non_retryable(&self) -> bool {
206 self.non_retryable
207 }
208
209 pub fn next_retry_delay(&self) -> Option<Duration> {
211 self.next_retry_delay
212 }
213
214 pub fn category(&self) -> ApplicationErrorCategory {
216 self.category
217 }
218
219 pub fn details<T: TemporalDeserializable + 'static>(
221 &self,
222 ) -> Result<Option<T>, PayloadConversionError> {
223 self.details
224 .as_ref()
225 .map(FailurePayloads::deserialize)
226 .transpose()
227 }
228
229 pub fn raw_details(&self) -> Option<&[Payload]> {
231 self.details.as_ref().and_then(FailurePayloads::raw)
232 }
233
234 pub(crate) fn failure_payloads(&self) -> Option<&FailurePayloads> {
235 self.details.as_ref()
236 }
237
238 pub fn failure(&self) -> Option<&Failure> {
240 self.failure.as_ref()
241 }
242
243 pub fn into_failure(self) -> Option<Failure> {
245 self.failure
246 }
247
248 pub fn cause(&self) -> Option<&IncomingError> {
250 self.cause.as_deref()
251 }
252
253 pub fn as_timeout(&self) -> Option<&TimeoutError> {
256 self.cause().and_then(IncomingError::as_timeout)
257 }
258
259 pub fn as_cancelled(&self) -> Option<&CancelledError> {
262 self.cause().and_then(IncomingError::as_cancelled)
263 }
264
265 pub(crate) fn from_failure(
266 failure: Failure,
267 cause: Option<IncomingError>,
268 payload_converter: &PayloadConverter,
269 context: &SerializationContextData,
270 ) -> Self {
271 let app_info = failure
272 .maybe_application_failure()
273 .cloned()
274 .unwrap_or_default();
275 let type_name = (!app_info.r#type.is_empty()).then_some(app_info.r#type.clone());
276 Self {
277 source: failure.message.clone().into(),
278 type_name,
279 non_retryable: app_info.non_retryable,
280 next_retry_delay: app_info.next_retry_delay.and_then(|d| d.try_into().ok()),
281 category: app_info.category().into(),
282 details: app_info.details.map(|details| {
283 FailurePayloads::from(DecodablePayloads::new(
284 details.payloads,
285 payload_converter.clone(),
286 *context,
287 ))
288 }),
289 failure: Some(failure),
290 cause: cause.map(Box::new),
291 }
292 }
293}
294
295impl std::fmt::Display for ApplicationFailure {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 write!(f, "{}", self.source)
298 }
299}
300
301impl std::error::Error for ApplicationFailure {
302 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
303 self.cause
304 .as_deref()
305 .map(|cause| cause as &(dyn std::error::Error + 'static))
306 .or_else(|| Some(self.source.as_ref()))
307 }
308}
309
310impl From<anyhow::Error> for ApplicationFailure {
311 fn from(value: anyhow::Error) -> Self {
312 Self::new(value)
313 }
314}
315
316impl From<PayloadConversionError> for ApplicationFailure {
317 fn from(value: PayloadConversionError) -> Self {
318 Self::new(value)
319 }
320}
321
322#[derive(Debug, thiserror::Error)]
324pub enum OutgoingError {
325 #[error(transparent)]
327 Activity(#[from] OutgoingActivityError),
328 #[error(transparent)]
330 Workflow(#[from] OutgoingWorkflowError),
331}
332
333#[derive(Debug, thiserror::Error)]
335pub enum OutgoingActivityError {
336 #[error(transparent)]
338 Application(#[from] Box<ApplicationFailure>),
339 #[error("Activity cancelled")]
341 Cancelled {
342 details: Option<FailurePayloads>,
344 },
345}
346
347#[derive(Debug, thiserror::Error)]
349pub enum OutgoingWorkflowError {
350 #[error(transparent)]
352 Application(#[from] Box<ApplicationFailure>),
353 #[error(transparent)]
355 ActivityExecution(#[from] Box<ActivityExecutionError>),
356 #[error(transparent)]
358 ChildWorkflowExecution(#[from] Box<ChildWorkflowExecutionError>),
359 #[error(transparent)]
361 ChildWorkflowStart(#[from] Box<ChildWorkflowStartError>),
362 #[error(transparent)]
364 WorkflowSignal(#[from] Box<WorkflowSignalError>),
365}
366
367impl From<anyhow::Error> for OutgoingWorkflowError {
368 fn from(value: anyhow::Error) -> Self {
369 Self::Application(Box::new(ApplicationFailure::new(value)))
370 }
371}
372
373impl From<PayloadConversionError> for OutgoingWorkflowError {
374 fn from(value: PayloadConversionError) -> Self {
375 Self::Application(Box::new(value.into()))
376 }
377}
378
379impl From<ApplicationFailure> for OutgoingWorkflowError {
380 fn from(value: ApplicationFailure) -> Self {
381 Self::Application(Box::new(value))
382 }
383}
384
385impl From<ActivityExecutionError> for OutgoingWorkflowError {
386 fn from(value: ActivityExecutionError) -> Self {
387 Self::ActivityExecution(Box::new(value))
388 }
389}
390
391impl From<ChildWorkflowExecutionError> for OutgoingWorkflowError {
392 fn from(value: ChildWorkflowExecutionError) -> Self {
393 Self::ChildWorkflowExecution(Box::new(value))
394 }
395}
396
397impl From<ChildWorkflowStartError> for OutgoingWorkflowError {
398 fn from(value: ChildWorkflowStartError) -> Self {
399 Self::ChildWorkflowStart(Box::new(value))
400 }
401}
402
403impl From<WorkflowSignalError> for OutgoingWorkflowError {
404 fn from(value: WorkflowSignalError) -> Self {
405 Self::WorkflowSignal(Box::new(value))
406 }
407}
408
409#[derive(Debug)]
411pub enum IncomingError {
412 Application(ApplicationFailure),
414 Timeout(TimeoutError),
416 Cancelled(CancelledError),
418 Terminated(TerminatedError),
420 Server(ServerError),
422 ResetWorkflow(ResetWorkflowError),
424 Activity(ActivityFailureError),
426 ChildWorkflowExecution(ChildWorkflowFailureError),
428 NexusOperationExecution(IncomingNexusOperationExecutionError),
430 NexusHandler(IncomingNexusHandlerError),
432}
433
434impl IncomingError {
435 pub fn failure(&self) -> &Failure {
437 match self {
438 IncomingError::Application(err) => err
439 .failure()
440 .expect("decoded application failures retain their original proto"),
441 IncomingError::Timeout(err) => err.failure(),
442 IncomingError::Cancelled(err) => err.failure(),
443 IncomingError::Terminated(err) => err.failure(),
444 IncomingError::Server(err) => err.failure(),
445 IncomingError::ResetWorkflow(err) => err.failure(),
446 IncomingError::Activity(err) => err.failure(),
447 IncomingError::ChildWorkflowExecution(err) => err.failure(),
448 IncomingError::NexusOperationExecution(err) => err.failure(),
449 IncomingError::NexusHandler(err) => err.failure(),
450 }
451 }
452
453 pub fn cause(&self) -> Option<&IncomingError> {
455 match self {
456 IncomingError::Application(err) => err.cause(),
457 IncomingError::Timeout(err) => err.cause(),
458 IncomingError::Cancelled(err) => err.cause(),
459 IncomingError::Terminated(err) => err.cause(),
460 IncomingError::Server(err) => err.cause(),
461 IncomingError::ResetWorkflow(err) => err.cause(),
462 IncomingError::Activity(err) => err.cause(),
463 IncomingError::ChildWorkflowExecution(err) => err.cause(),
464 IncomingError::NexusOperationExecution(err) => err.cause(),
465 IncomingError::NexusHandler(err) => err.cause(),
466 }
467 }
468
469 pub fn into_failure(self) -> Failure {
471 match self {
472 IncomingError::Application(err) => err
473 .into_failure()
474 .expect("decoded application failures retain their original proto"),
475 IncomingError::Timeout(err) => err.into_failure(),
476 IncomingError::Cancelled(err) => err.into_failure(),
477 IncomingError::Terminated(err) => err.into_failure(),
478 IncomingError::Server(err) => err.into_failure(),
479 IncomingError::ResetWorkflow(err) => err.into_failure(),
480 IncomingError::Activity(err) => err.into_failure(),
481 IncomingError::ChildWorkflowExecution(err) => err.into_failure(),
482 IncomingError::NexusOperationExecution(err) => err.into_failure(),
483 IncomingError::NexusHandler(err) => err.into_failure(),
484 }
485 }
486
487 pub fn as_timeout(&self) -> Option<&TimeoutError> {
489 match self {
490 IncomingError::Timeout(err) => Some(err),
491 _ => None,
492 }
493 }
494
495 pub fn as_cancelled(&self) -> Option<&CancelledError> {
497 match self {
498 IncomingError::Cancelled(err) => Some(err),
499 _ => None,
500 }
501 }
502}
503
504impl std::fmt::Display for IncomingError {
505 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
506 match self {
507 IncomingError::Application(err) => err.fmt(f),
508 IncomingError::Timeout(err) => err.fmt(f),
509 IncomingError::Cancelled(err) => err.fmt(f),
510 IncomingError::Terminated(err) => err.fmt(f),
511 IncomingError::Server(err) => err.fmt(f),
512 IncomingError::ResetWorkflow(err) => err.fmt(f),
513 IncomingError::Activity(err) => err.fmt(f),
514 IncomingError::ChildWorkflowExecution(err) => err.fmt(f),
515 IncomingError::NexusOperationExecution(err) => err.fmt(f),
516 IncomingError::NexusHandler(err) => err.fmt(f),
517 }
518 }
519}
520
521impl std::error::Error for IncomingError {
522 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
523 match self {
524 IncomingError::Application(err) => Some(err),
525 IncomingError::Timeout(err) => Some(err),
526 IncomingError::Cancelled(err) => Some(err),
527 IncomingError::Terminated(err) => Some(err),
528 IncomingError::Server(err) => Some(err),
529 IncomingError::ResetWorkflow(err) => Some(err),
530 IncomingError::Activity(err) => Some(err),
531 IncomingError::ChildWorkflowExecution(err) => Some(err),
532 IncomingError::NexusOperationExecution(err) => Some(err),
533 IncomingError::NexusHandler(err) => Some(err),
534 }
535 }
536}
537
538macro_rules! impl_incoming_failure_wrapper {
539 ($name:ident) => {
540 impl $name {
541 pub fn failure(&self) -> &Failure {
543 &self.failure
544 }
545
546 pub fn cause(&self) -> Option<&IncomingError> {
548 self.cause.as_deref()
549 }
550
551 pub fn into_failure(self) -> Failure {
553 self.failure
554 }
555
556 pub fn into_parts(self) -> (Failure, Option<IncomingError>) {
558 (self.failure, self.cause.map(|cause| *cause))
559 }
560 }
561
562 impl std::fmt::Display for $name {
563 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564 self.failure.fmt(f)
565 }
566 }
567
568 impl std::error::Error for $name {
569 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
570 self.cause
571 .as_deref()
572 .map(|cause| cause as &(dyn std::error::Error + 'static))
573 }
574 }
575 };
576}
577
578macro_rules! incoming_failure_wrapper {
579 ($name:ident, $doc:literal) => {
580 #[doc = $doc]
581 #[derive(Debug)]
582 pub struct $name {
583 failure: Failure,
584 cause: Option<Box<IncomingError>>,
585 }
586
587 impl $name {
588 pub(crate) fn new(failure: Failure, cause: Option<IncomingError>) -> Self {
590 Self {
591 failure,
592 cause: cause.map(Box::new),
593 }
594 }
595 }
596
597 impl_incoming_failure_wrapper!($name);
598 };
599}
600
601#[derive(Debug)]
603pub struct TimeoutError {
604 failure: Failure,
605 cause: Option<Box<IncomingError>>,
606 timeout_type: TimeoutType,
607 last_heartbeat_details: Option<DecodablePayloads>,
608}
609
610impl TimeoutError {
611 pub(crate) fn new(
613 failure: Failure,
614 failure_info: crate::protos::temporal::api::failure::v1::TimeoutFailureInfo,
615 cause: Option<IncomingError>,
616 payload_converter: &PayloadConverter,
617 context: &SerializationContextData,
618 ) -> Self {
619 Self {
620 failure,
621 cause: cause.map(Box::new),
622 timeout_type: failure_info.timeout_type(),
623 last_heartbeat_details: failure_info.last_heartbeat_details.map(|details| {
624 DecodablePayloads::new(details.payloads, payload_converter.clone(), *context)
625 }),
626 }
627 }
628
629 pub fn timeout_type(&self) -> TimeoutType {
631 self.timeout_type
632 }
633
634 pub fn last_heartbeat_details<T: TemporalDeserializable + 'static>(
636 &self,
637 ) -> Result<Option<T>, PayloadConversionError> {
638 self.last_heartbeat_details
639 .as_ref()
640 .map(DecodablePayloads::deserialize)
641 .transpose()
642 }
643
644 pub fn raw_last_heartbeat_details(&self) -> Option<&[Payload]> {
646 self.last_heartbeat_details
647 .as_ref()
648 .map(DecodablePayloads::raw)
649 }
650}
651
652impl_incoming_failure_wrapper!(TimeoutError);
653
654#[derive(Debug)]
656pub struct CancelledError {
657 failure: Failure,
658 cause: Option<Box<IncomingError>>,
659 details: Option<DecodablePayloads>,
660}
661
662impl CancelledError {
663 pub(crate) fn new(
665 failure: Failure,
666 failure_info: crate::protos::temporal::api::failure::v1::CanceledFailureInfo,
667 cause: Option<IncomingError>,
668 payload_converter: &PayloadConverter,
669 context: &SerializationContextData,
670 ) -> Self {
671 Self {
672 failure,
673 cause: cause.map(Box::new),
674 details: failure_info.details.map(|details| {
675 DecodablePayloads::new(details.payloads, payload_converter.clone(), *context)
676 }),
677 }
678 }
679
680 pub fn details<T: TemporalDeserializable + 'static>(
683 &self,
684 ) -> Result<Option<T>, PayloadConversionError> {
685 self.details
686 .as_ref()
687 .map(DecodablePayloads::deserialize)
688 .transpose()
689 }
690
691 pub fn raw_details(&self) -> Option<&[Payload]> {
693 self.details.as_ref().map(DecodablePayloads::raw)
694 }
695}
696
697impl_incoming_failure_wrapper!(CancelledError);
698incoming_failure_wrapper!(TerminatedError, "A normalized terminated failure.");
699incoming_failure_wrapper!(ServerError, "A normalized server failure.");
700incoming_failure_wrapper!(ResetWorkflowError, "A normalized reset-workflow failure.");
701
702#[derive(Debug)]
704pub struct ActivityFailureError {
705 failure: Failure,
706 cause: Option<Box<IncomingError>>,
707 activity_id: String,
708 activity_type: Option<crate::protos::temporal::api::common::v1::ActivityType>,
709 scheduled_event_id: i64,
710 started_event_id: i64,
711 identity: String,
712 retry_state: crate::protos::temporal::api::enums::v1::RetryState,
713}
714
715impl ActivityFailureError {
716 pub(crate) fn new(
718 failure: Failure,
719 failure_info: crate::protos::temporal::api::failure::v1::ActivityFailureInfo,
720 cause: Option<IncomingError>,
721 ) -> Self {
722 let retry_state = failure_info.retry_state();
723 Self {
724 failure,
725 cause: cause.map(Box::new),
726 activity_id: failure_info.activity_id,
727 activity_type: failure_info.activity_type,
728 scheduled_event_id: failure_info.scheduled_event_id,
729 started_event_id: failure_info.started_event_id,
730 identity: failure_info.identity,
731 retry_state,
732 }
733 }
734
735 pub fn activity_id(&self) -> &str {
737 &self.activity_id
738 }
739
740 pub fn activity_type(&self) -> Option<&crate::protos::temporal::api::common::v1::ActivityType> {
742 self.activity_type.as_ref()
743 }
744
745 pub fn scheduled_event_id(&self) -> i64 {
747 self.scheduled_event_id
748 }
749
750 pub fn started_event_id(&self) -> i64 {
752 self.started_event_id
753 }
754
755 pub fn identity(&self) -> &str {
757 &self.identity
758 }
759
760 pub fn retry_state(&self) -> crate::protos::temporal::api::enums::v1::RetryState {
762 self.retry_state
763 }
764
765 pub fn as_timeout(&self) -> Option<&TimeoutError> {
768 self.cause().and_then(IncomingError::as_timeout)
769 }
770
771 pub fn as_cancelled(&self) -> Option<&CancelledError> {
774 self.cause().and_then(IncomingError::as_cancelled)
775 }
776}
777
778impl_incoming_failure_wrapper!(ActivityFailureError);
779#[derive(Debug)]
781pub struct ChildWorkflowFailureError {
782 failure: Failure,
783 cause: Option<Box<IncomingError>>,
784 namespace: String,
785 workflow_execution: Option<crate::protos::temporal::api::common::v1::WorkflowExecution>,
786 workflow_type: Option<crate::protos::temporal::api::common::v1::WorkflowType>,
787 initiated_event_id: i64,
788 started_event_id: i64,
789 retry_state: crate::protos::temporal::api::enums::v1::RetryState,
790}
791
792impl ChildWorkflowFailureError {
793 pub(crate) fn new(
795 failure: Failure,
796 failure_info: crate::protos::temporal::api::failure::v1::ChildWorkflowExecutionFailureInfo,
797 cause: Option<IncomingError>,
798 ) -> Self {
799 let retry_state = failure_info.retry_state();
800 Self {
801 failure,
802 cause: cause.map(Box::new),
803 namespace: failure_info.namespace,
804 workflow_execution: failure_info.workflow_execution,
805 workflow_type: failure_info.workflow_type,
806 initiated_event_id: failure_info.initiated_event_id,
807 started_event_id: failure_info.started_event_id,
808 retry_state,
809 }
810 }
811
812 pub fn namespace(&self) -> &str {
814 &self.namespace
815 }
816
817 pub fn workflow_execution(
819 &self,
820 ) -> Option<&crate::protos::temporal::api::common::v1::WorkflowExecution> {
821 self.workflow_execution.as_ref()
822 }
823
824 pub fn workflow_type(&self) -> Option<&crate::protos::temporal::api::common::v1::WorkflowType> {
826 self.workflow_type.as_ref()
827 }
828
829 pub fn initiated_event_id(&self) -> i64 {
831 self.initiated_event_id
832 }
833
834 pub fn started_event_id(&self) -> i64 {
836 self.started_event_id
837 }
838
839 pub fn retry_state(&self) -> crate::protos::temporal::api::enums::v1::RetryState {
841 self.retry_state
842 }
843
844 pub fn as_timeout(&self) -> Option<&TimeoutError> {
847 self.cause().and_then(IncomingError::as_timeout)
848 }
849
850 pub fn as_cancelled(&self) -> Option<&CancelledError> {
853 self.cause().and_then(IncomingError::as_cancelled)
854 }
855}
856
857impl_incoming_failure_wrapper!(ChildWorkflowFailureError);
858incoming_failure_wrapper!(
859 IncomingNexusOperationExecutionError,
860 "A normalized nexus operation failure wrapper."
861);
862incoming_failure_wrapper!(
863 IncomingNexusHandlerError,
864 "A normalized nexus handler failure wrapper."
865);
866
867#[derive(Debug, thiserror::Error)]
869pub enum ActivityExecutionError {
870 #[error("Activity failed: {}", .0.failure().message)]
872 Failed(#[source] ActivityFailureError),
873 #[error("Activity cancelled: {}", .0.failure().message)]
875 Cancelled(#[source] CancelledError),
876 #[error("Payload conversion failed: {0}")]
878 Serialization(#[from] PayloadConversionError),
879}
880
881impl ActivityExecutionError {
882 pub fn failure(&self) -> Option<&Failure> {
884 match self {
885 ActivityExecutionError::Failed(err) => Some(err.failure()),
886 ActivityExecutionError::Cancelled(err) => Some(err.failure()),
887 ActivityExecutionError::Serialization(_) => None,
888 }
889 }
890
891 pub fn cause(&self) -> Option<&IncomingError> {
893 match self {
894 ActivityExecutionError::Failed(err) => err.cause(),
895 ActivityExecutionError::Cancelled(err) => err.cause(),
896 ActivityExecutionError::Serialization(_) => None,
897 }
898 }
899
900 pub fn reason(&self) -> Option<&IncomingError> {
902 match self {
903 ActivityExecutionError::Failed(err) => err.cause(),
904 ActivityExecutionError::Cancelled(_) | ActivityExecutionError::Serialization(_) => None,
905 }
906 }
907
908 pub fn as_timeout(&self) -> Option<&TimeoutError> {
911 match self {
912 ActivityExecutionError::Failed(err) => err.as_timeout(),
913 ActivityExecutionError::Serialization(_) | ActivityExecutionError::Cancelled(_) => None,
914 }
915 }
916
917 pub fn as_cancelled(&self) -> Option<&CancelledError> {
920 match self {
921 ActivityExecutionError::Failed(err) => err.as_cancelled(),
922 ActivityExecutionError::Cancelled(err) => Some(err),
923 ActivityExecutionError::Serialization(_) => None,
924 }
925 }
926}
927
928#[derive(Debug, thiserror::Error)]
930pub enum ChildWorkflowStartError {
931 #[error("Child workflow start cancelled: {}", .0.failure().message)]
933 Cancelled(#[source] Box<CancelledError>),
934 #[error(
936 "Child workflow start failed: workflow_id={workflow_id}, workflow_type={workflow_type}, cause={cause:?}"
937 )]
938 StartFailed {
939 workflow_id: String,
941 workflow_type: String,
943 cause: StartChildWorkflowExecutionFailedCause,
945 },
946 #[error("Payload conversion failed: {0}")]
948 Serialization(#[from] PayloadConversionError),
949}
950
951impl ChildWorkflowStartError {
952 pub fn failure(&self) -> Option<&Failure> {
954 match self {
955 ChildWorkflowStartError::Cancelled(err) => Some(err.failure()),
956 ChildWorkflowStartError::StartFailed { .. }
957 | ChildWorkflowStartError::Serialization(_) => None,
958 }
959 }
960
961 pub fn cause(&self) -> Option<&IncomingError> {
963 match self {
964 ChildWorkflowStartError::Cancelled(err) => err.cause(),
965 ChildWorkflowStartError::StartFailed { .. }
966 | ChildWorkflowStartError::Serialization(_) => None,
967 }
968 }
969}
970
971#[derive(Debug, thiserror::Error)]
973pub enum ChildWorkflowExecutionError {
974 #[error("Child workflow failed: {}", .0.failure().message)]
976 Failed(#[source] Box<ChildWorkflowFailureError>),
977 #[error("Payload conversion failed: {0}")]
979 Serialization(#[from] PayloadConversionError),
980}
981
982impl ChildWorkflowExecutionError {
983 pub fn failure(&self) -> Option<&Failure> {
985 match self {
986 ChildWorkflowExecutionError::Failed(err) => Some(err.failure()),
987 ChildWorkflowExecutionError::Serialization(_) => None,
988 }
989 }
990
991 pub fn cause(&self) -> Option<&IncomingError> {
993 match self {
994 ChildWorkflowExecutionError::Failed(err) => err.cause(),
995 ChildWorkflowExecutionError::Serialization(_) => None,
996 }
997 }
998
999 pub fn reason(&self) -> Option<&IncomingError> {
1001 match self {
1002 ChildWorkflowExecutionError::Failed(err) => err.cause(),
1003 ChildWorkflowExecutionError::Serialization(_) => None,
1004 }
1005 }
1006
1007 pub fn as_timeout(&self) -> Option<&TimeoutError> {
1010 match self {
1011 ChildWorkflowExecutionError::Failed(err) => err.as_timeout(),
1012 ChildWorkflowExecutionError::Serialization(_) => None,
1013 }
1014 }
1015
1016 pub fn as_cancelled(&self) -> Option<&CancelledError> {
1019 match self {
1020 ChildWorkflowExecutionError::Failed(err) => err.as_cancelled(),
1021 ChildWorkflowExecutionError::Serialization(_) => None,
1022 }
1023 }
1024}
1025
1026#[derive(Debug, thiserror::Error)]
1028pub enum WorkflowSignalError {
1029 #[error("Child workflow signal failed: {}", .0.failure().message)]
1031 Failed(#[source] Box<WorkflowSignalFailureError>),
1032 #[error("Signal payload conversion failed: {0}")]
1034 Serialization(#[from] PayloadConversionError),
1035}
1036
1037impl WorkflowSignalError {
1038 pub fn failure(&self) -> Option<&Failure> {
1040 match self {
1041 WorkflowSignalError::Failed(err) => Some(err.failure()),
1042 WorkflowSignalError::Serialization(_) => None,
1043 }
1044 }
1045
1046 pub fn cause(&self) -> Option<&IncomingError> {
1048 match self {
1049 WorkflowSignalError::Failed(err) => err.cause(),
1050 WorkflowSignalError::Serialization(_) => None,
1051 }
1052 }
1053
1054 pub fn reason(&self) -> Option<&IncomingError> {
1056 match self {
1057 WorkflowSignalError::Failed(err) => Some(err.error()),
1058 WorkflowSignalError::Serialization(_) => None,
1059 }
1060 }
1061}
1062
1063#[derive(Debug)]
1065pub struct WorkflowSignalFailureError {
1066 failure: Failure,
1067 error: Box<IncomingError>,
1068}
1069
1070impl WorkflowSignalFailureError {
1071 pub(crate) fn new(failure: Failure, error: IncomingError) -> Self {
1073 Self {
1074 failure,
1075 error: Box::new(error),
1076 }
1077 }
1078
1079 pub fn failure(&self) -> &Failure {
1081 &self.failure
1082 }
1083
1084 pub fn cause(&self) -> Option<&IncomingError> {
1086 self.error.cause()
1087 }
1088
1089 pub fn error(&self) -> &IncomingError {
1091 &self.error
1092 }
1093}
1094
1095impl std::fmt::Display for WorkflowSignalFailureError {
1096 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1097 self.failure.fmt(f)
1098 }
1099}
1100
1101impl std::error::Error for WorkflowSignalFailureError {
1102 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1103 self.cause()
1104 .map(|cause| cause as &(dyn std::error::Error + 'static))
1105 }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110 use super::*;
1111 use crate::{
1112 data_converters::{
1113 DefaultFailureConverter, FailureConverter, GenericPayloadConverter, PayloadConverter,
1114 SerializationContext, SerializationContextData,
1115 },
1116 protos::temporal::api::{common::v1::Payload, failure::v1::failure::FailureInfo},
1117 };
1118
1119 struct AlwaysFailsSerialize;
1120
1121 impl serde::Serialize for AlwaysFailsSerialize {
1122 fn serialize<S: serde::Serializer>(&self, _serializer: S) -> Result<S::Ok, S::Error> {
1123 Err(serde::ser::Error::custom("serialize boom"))
1124 }
1125 }
1126
1127 #[test]
1128 fn constructors_set_retryability_defaults() {
1129 assert!(!ApplicationFailure::new(anyhow::anyhow!("retryable")).is_non_retryable());
1130 assert!(
1131 ApplicationFailure::non_retryable(anyhow::anyhow!("non-retryable")).is_non_retryable()
1132 );
1133 }
1134
1135 #[test]
1136 fn conversion_preserves_application_metadata() {
1137 let payloads = Payloads {
1138 payloads: vec![Payload {
1139 data: b"details".to_vec(),
1140 ..Default::default()
1141 }],
1142 };
1143 let failure = DefaultFailureConverter.to_failure(
1144 OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1145 ApplicationFailure::builder(anyhow::anyhow!("oops"))
1146 .type_name("MyType".to_owned())
1147 .non_retryable(true)
1148 .next_retry_delay(Duration::from_secs(3))
1149 .category(ApplicationErrorCategory::Benign)
1150 .details(RawValue::new(payloads.payloads.clone()))
1151 .build(),
1152 ))),
1153 &PayloadConverter::default(),
1154 &SerializationContextData::None,
1155 );
1156 let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1157 panic!("expected application failure info");
1158 };
1159 assert_eq!(failure.message, "oops");
1160 assert_eq!(info.r#type, "MyType");
1161 assert!(info.non_retryable);
1162 assert_eq!(info.details, Some(payloads));
1163 assert_eq!(info.category(), ProtoApplicationErrorCategory::Benign);
1164 assert_eq!(info.next_retry_delay.unwrap().seconds, 3);
1165 }
1166
1167 #[test]
1168 fn builder_accepts_raw_payload_details() {
1169 let payload = Payload {
1170 data: b"details".to_vec(),
1171 ..Default::default()
1172 };
1173 let failure = DefaultFailureConverter.to_failure(
1174 OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1175 ApplicationFailure::builder(anyhow::anyhow!("oops"))
1176 .details(RawValue::new(vec![payload.clone()]))
1177 .build(),
1178 ))),
1179 &PayloadConverter::default(),
1180 &SerializationContextData::None,
1181 );
1182
1183 let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1184 panic!("expected application failure info");
1185 };
1186 assert_eq!(info.details.unwrap().payloads, vec![payload]);
1187 }
1188
1189 #[test]
1190 fn builder_accepts_serializable_details() {
1191 let failure = DefaultFailureConverter.to_failure(
1192 OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1193 ApplicationFailure::builder(anyhow::anyhow!("oops"))
1194 .details("details".to_string())
1195 .build(),
1196 ))),
1197 &PayloadConverter::default(),
1198 &SerializationContextData::None,
1199 );
1200
1201 let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1202 panic!("expected application failure info");
1203 };
1204 let payloads = info.details.expect("expected details").payloads;
1205 let converter = PayloadConverter::default();
1206 let details: String = converter
1207 .from_payloads(
1208 &SerializationContext {
1209 data: &SerializationContextData::None,
1210 converter: &converter,
1211 },
1212 payloads,
1213 )
1214 .unwrap();
1215 assert_eq!(details, "details");
1216 }
1217
1218 #[test]
1219 fn application_failure_encoding_surfaces_detail_encoding_errors() {
1220 let failure = DefaultFailureConverter.to_failure(
1221 OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1222 ApplicationFailure::builder(anyhow::anyhow!("oops"))
1223 .details(AlwaysFailsSerialize)
1224 .build(),
1225 ))),
1226 &PayloadConverter::default(),
1227 &SerializationContextData::None,
1228 );
1229
1230 assert_eq!(
1231 failure.message,
1232 "Failed converting error to failure: Encoding error: serialize boom, original error message: oops"
1233 );
1234 assert!(matches!(
1235 failure.failure_info,
1236 Some(FailureInfo::ApplicationFailureInfo(_))
1237 ));
1238 }
1239
1240 #[test]
1241 fn anyhow_workflow_errors_default_to_application_outgoing_errors() {
1242 let outgoing: OutgoingWorkflowError = anyhow::anyhow!("workflow boom").into();
1243
1244 let OutgoingWorkflowError::Application(app) = outgoing else {
1245 panic!("plain workflow errors should default to application failures");
1246 };
1247 assert_eq!(app.to_string(), "workflow boom");
1248 }
1249
1250 #[test]
1251 fn payload_conversion_errors_default_to_application_outgoing_errors() {
1252 let outgoing: OutgoingWorkflowError =
1253 PayloadConversionError::EncodingError(anyhow::anyhow!("encode boom").into()).into();
1254
1255 let OutgoingWorkflowError::Application(app) = outgoing else {
1256 panic!("payload conversion errors should default to application failures");
1257 };
1258 assert_eq!(app.to_string(), "Encoding error: encode boom");
1259 }
1260}