1use crate::{
4 data_converters::{
5 DecodablePayloads, GenericPayloadConverter, PayloadConversionError, PayloadConverter,
6 RawValue, SerializationContext, SerializationContextData, TemporalDeserializable,
7 TemporalSerializable,
8 },
9 protos::{
10 coresdk::child_workflow::StartChildWorkflowExecutionFailedCause,
11 temporal::api::{
12 common::v1::{Payload, Payloads},
13 enums::v1::{ApplicationErrorCategory, TimeoutType},
14 failure::v1::Failure,
15 },
16 },
17};
18use std::time::Duration;
19
20trait SerializableFailurePayload: Send + Sync {
23 fn to_payloads(
24 &self,
25 payload_converter: &PayloadConverter,
26 context: &SerializationContextData,
27 ) -> Result<Vec<Payload>, PayloadConversionError>;
28}
29
30impl<T> SerializableFailurePayload for T
31where
32 T: TemporalSerializable + Send + Sync + 'static,
33{
34 fn to_payloads(
35 &self,
36 payload_converter: &PayloadConverter,
37 context: &SerializationContextData,
38 ) -> Result<Vec<Payload>, PayloadConversionError> {
39 payload_converter.to_payloads(
40 &SerializationContext {
41 data: context,
42 converter: payload_converter,
43 },
44 self,
45 )
46 }
47}
48
49#[derive(derive_more::Debug)]
51pub struct FailurePayloads {
52 repr: FailurePayloadsRepr,
53}
54
55#[derive(derive_more::Debug)]
56enum FailurePayloadsRepr {
57 #[debug("Serializable(...)")]
58 Serializable(#[debug(skip)] Box<dyn SerializableFailurePayload>),
59 Decoded(DecodablePayloads),
60}
61
62impl FailurePayloads {
63 pub(crate) fn encode(
64 &self,
65 payload_converter: &PayloadConverter,
66 context: &SerializationContextData,
67 ) -> Result<Payloads, PayloadConversionError> {
68 let payloads = match &self.repr {
69 FailurePayloadsRepr::Serializable(value) => {
70 value.to_payloads(payload_converter, context)?
71 }
72 FailurePayloadsRepr::Decoded(value) => value.raw().to_vec(),
73 };
74 Ok(Payloads { payloads })
75 }
76
77 pub fn deserialize<T: TemporalDeserializable + 'static>(
79 &self,
80 ) -> Result<T, PayloadConversionError> {
81 match &self.repr {
82 FailurePayloadsRepr::Decoded(value) => value.deserialize(),
83 FailurePayloadsRepr::Serializable(_) => Err(PayloadConversionError::WrongEncoding),
84 }
85 }
86
87 pub fn raw(&self) -> Option<&[Payload]> {
89 match &self.repr {
90 FailurePayloadsRepr::Decoded(value) => Some(value.raw()),
91 FailurePayloadsRepr::Serializable(_) => None,
92 }
93 }
94
95 pub fn into_raw(self) -> Option<RawValue> {
97 match self.repr {
98 FailurePayloadsRepr::Decoded(value) => Some(value.into_raw()),
99 FailurePayloadsRepr::Serializable(_) => None,
100 }
101 }
102}
103
104impl From<DecodablePayloads> for FailurePayloads {
105 fn from(value: DecodablePayloads) -> Self {
106 Self {
107 repr: FailurePayloadsRepr::Decoded(value),
108 }
109 }
110}
111
112impl<T> From<T> for FailurePayloads
113where
114 T: TemporalSerializable + Send + Sync + 'static,
115{
116 fn from(value: T) -> Self {
117 Self {
118 repr: FailurePayloadsRepr::Serializable(Box::new(value)),
119 }
120 }
121}
122
123#[derive(Debug, bon::Builder)]
125#[builder(start_fn = builder, state_mod(vis = "pub"))]
126pub struct ApplicationFailure {
127 #[builder(start_fn, into)]
128 source: anyhow::Error,
129 type_name: Option<String>,
130 #[builder(default)]
131 non_retryable: bool,
132 next_retry_delay: Option<Duration>,
133 #[builder(default = ApplicationErrorCategory::Unspecified)]
134 category: ApplicationErrorCategory,
135 #[builder(into)]
136 details: Option<FailurePayloads>,
137 failure: Option<Failure>,
138 cause: Option<Box<IncomingError>>,
139}
140
141impl ApplicationFailure {
142 pub fn new(source: impl Into<anyhow::Error>) -> Self {
144 Self {
145 source: source.into(),
146 type_name: None,
147 non_retryable: false,
148 next_retry_delay: None,
149 category: ApplicationErrorCategory::Unspecified,
150 details: None,
151 failure: None,
152 cause: None,
153 }
154 }
155
156 pub fn non_retryable(source: impl Into<anyhow::Error>) -> Self {
158 Self {
159 non_retryable: true,
160 ..Self::new(source)
161 }
162 }
163
164 pub fn source_error(&self) -> &anyhow::Error {
166 &self.source
167 }
168
169 pub fn type_name(&self) -> Option<&str> {
171 self.type_name.as_deref()
172 }
173
174 pub fn is_non_retryable(&self) -> bool {
176 self.non_retryable
177 }
178
179 pub fn next_retry_delay(&self) -> Option<Duration> {
181 self.next_retry_delay
182 }
183
184 pub fn category(&self) -> ApplicationErrorCategory {
186 self.category
187 }
188
189 pub fn details<T: TemporalDeserializable + 'static>(
191 &self,
192 ) -> Result<Option<T>, PayloadConversionError> {
193 self.details
194 .as_ref()
195 .map(FailurePayloads::deserialize)
196 .transpose()
197 }
198
199 pub fn raw_details(&self) -> Option<&[Payload]> {
201 self.details.as_ref().and_then(FailurePayloads::raw)
202 }
203
204 pub(crate) fn failure_payloads(&self) -> Option<&FailurePayloads> {
205 self.details.as_ref()
206 }
207
208 pub fn failure(&self) -> Option<&Failure> {
210 self.failure.as_ref()
211 }
212
213 pub fn into_failure(self) -> Option<Failure> {
215 self.failure
216 }
217
218 pub fn cause(&self) -> Option<&IncomingError> {
220 self.cause.as_deref()
221 }
222
223 pub fn as_timeout(&self) -> Option<&TimeoutError> {
226 self.cause().and_then(IncomingError::as_timeout)
227 }
228
229 pub fn as_cancelled(&self) -> Option<&CancelledError> {
232 self.cause().and_then(IncomingError::as_cancelled)
233 }
234
235 pub(crate) fn from_failure(
236 failure: Failure,
237 cause: Option<IncomingError>,
238 payload_converter: &PayloadConverter,
239 context: &SerializationContextData,
240 ) -> Self {
241 let app_info = failure
242 .maybe_application_failure()
243 .cloned()
244 .unwrap_or_default();
245 let type_name = (!app_info.r#type.is_empty()).then_some(app_info.r#type.clone());
246 Self {
247 source: anyhow::anyhow!(failure.message.clone()),
248 type_name,
249 non_retryable: app_info.non_retryable,
250 next_retry_delay: app_info.next_retry_delay.and_then(|d| d.try_into().ok()),
251 category: app_info.category(),
252 details: app_info.details.map(|details| {
253 FailurePayloads::from(DecodablePayloads::new(
254 details.payloads,
255 payload_converter.clone(),
256 *context,
257 ))
258 }),
259 failure: Some(failure),
260 cause: cause.map(Box::new),
261 }
262 }
263}
264
265impl std::fmt::Display for ApplicationFailure {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 write!(f, "{}", self.source)
268 }
269}
270
271impl std::error::Error for ApplicationFailure {
272 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
273 self.cause
274 .as_deref()
275 .map(|cause| cause as &(dyn std::error::Error + 'static))
276 .or_else(|| Some(self.source.as_ref()))
277 }
278}
279
280impl From<anyhow::Error> for ApplicationFailure {
281 fn from(value: anyhow::Error) -> Self {
282 Self::new(value)
283 }
284}
285
286impl From<PayloadConversionError> for ApplicationFailure {
287 fn from(value: PayloadConversionError) -> Self {
288 Self::new(value)
289 }
290}
291
292#[derive(Debug, thiserror::Error)]
294pub enum OutgoingError {
295 #[error(transparent)]
297 Activity(#[from] OutgoingActivityError),
298 #[error(transparent)]
300 Workflow(#[from] OutgoingWorkflowError),
301}
302
303#[derive(Debug, thiserror::Error)]
305pub enum OutgoingActivityError {
306 #[error(transparent)]
308 Application(#[from] Box<ApplicationFailure>),
309 #[error("Activity cancelled")]
311 Cancelled {
312 details: Option<FailurePayloads>,
314 },
315}
316
317#[derive(Debug, thiserror::Error)]
319pub enum OutgoingWorkflowError {
320 #[error(transparent)]
322 Application(#[from] Box<ApplicationFailure>),
323 #[error(transparent)]
325 ActivityExecution(#[from] Box<ActivityExecutionError>),
326 #[error(transparent)]
328 ChildWorkflowExecution(#[from] Box<ChildWorkflowExecutionError>),
329 #[error(transparent)]
331 ChildWorkflowStart(#[from] Box<ChildWorkflowStartError>),
332 #[error(transparent)]
334 ChildWorkflowSignal(#[from] Box<ChildWorkflowSignalError>),
335}
336
337impl From<anyhow::Error> for OutgoingWorkflowError {
338 fn from(value: anyhow::Error) -> Self {
339 Self::Application(Box::new(ApplicationFailure::new(value)))
340 }
341}
342
343impl From<PayloadConversionError> for OutgoingWorkflowError {
344 fn from(value: PayloadConversionError) -> Self {
345 Self::Application(Box::new(value.into()))
346 }
347}
348
349impl From<ApplicationFailure> for OutgoingWorkflowError {
350 fn from(value: ApplicationFailure) -> Self {
351 Self::Application(Box::new(value))
352 }
353}
354
355impl From<ActivityExecutionError> for OutgoingWorkflowError {
356 fn from(value: ActivityExecutionError) -> Self {
357 Self::ActivityExecution(Box::new(value))
358 }
359}
360
361impl From<ChildWorkflowExecutionError> for OutgoingWorkflowError {
362 fn from(value: ChildWorkflowExecutionError) -> Self {
363 Self::ChildWorkflowExecution(Box::new(value))
364 }
365}
366
367impl From<ChildWorkflowStartError> for OutgoingWorkflowError {
368 fn from(value: ChildWorkflowStartError) -> Self {
369 Self::ChildWorkflowStart(Box::new(value))
370 }
371}
372
373impl From<ChildWorkflowSignalError> for OutgoingWorkflowError {
374 fn from(value: ChildWorkflowSignalError) -> Self {
375 Self::ChildWorkflowSignal(Box::new(value))
376 }
377}
378
379#[derive(Debug)]
381pub enum IncomingError {
382 Application(ApplicationFailure),
384 Timeout(TimeoutError),
386 Cancelled(CancelledError),
388 Terminated(TerminatedError),
390 Server(ServerError),
392 ResetWorkflow(ResetWorkflowError),
394 Activity(ActivityFailureError),
396 ChildWorkflowExecution(ChildWorkflowFailureError),
398 NexusOperationExecution(IncomingNexusOperationExecutionError),
400 NexusHandler(IncomingNexusHandlerError),
402}
403
404impl IncomingError {
405 pub fn failure(&self) -> &Failure {
407 match self {
408 IncomingError::Application(err) => err
409 .failure()
410 .expect("decoded application failures retain their original proto"),
411 IncomingError::Timeout(err) => err.failure(),
412 IncomingError::Cancelled(err) => err.failure(),
413 IncomingError::Terminated(err) => err.failure(),
414 IncomingError::Server(err) => err.failure(),
415 IncomingError::ResetWorkflow(err) => err.failure(),
416 IncomingError::Activity(err) => err.failure(),
417 IncomingError::ChildWorkflowExecution(err) => err.failure(),
418 IncomingError::NexusOperationExecution(err) => err.failure(),
419 IncomingError::NexusHandler(err) => err.failure(),
420 }
421 }
422
423 pub fn cause(&self) -> Option<&IncomingError> {
425 match self {
426 IncomingError::Application(err) => err.cause(),
427 IncomingError::Timeout(err) => err.cause(),
428 IncomingError::Cancelled(err) => err.cause(),
429 IncomingError::Terminated(err) => err.cause(),
430 IncomingError::Server(err) => err.cause(),
431 IncomingError::ResetWorkflow(err) => err.cause(),
432 IncomingError::Activity(err) => err.cause(),
433 IncomingError::ChildWorkflowExecution(err) => err.cause(),
434 IncomingError::NexusOperationExecution(err) => err.cause(),
435 IncomingError::NexusHandler(err) => err.cause(),
436 }
437 }
438
439 pub fn into_failure(self) -> Failure {
441 match self {
442 IncomingError::Application(err) => err
443 .into_failure()
444 .expect("decoded application failures retain their original proto"),
445 IncomingError::Timeout(err) => err.into_failure(),
446 IncomingError::Cancelled(err) => err.into_failure(),
447 IncomingError::Terminated(err) => err.into_failure(),
448 IncomingError::Server(err) => err.into_failure(),
449 IncomingError::ResetWorkflow(err) => err.into_failure(),
450 IncomingError::Activity(err) => err.into_failure(),
451 IncomingError::ChildWorkflowExecution(err) => err.into_failure(),
452 IncomingError::NexusOperationExecution(err) => err.into_failure(),
453 IncomingError::NexusHandler(err) => err.into_failure(),
454 }
455 }
456
457 pub fn as_timeout(&self) -> Option<&TimeoutError> {
459 match self {
460 IncomingError::Timeout(err) => Some(err),
461 _ => None,
462 }
463 }
464
465 pub fn as_cancelled(&self) -> Option<&CancelledError> {
467 match self {
468 IncomingError::Cancelled(err) => Some(err),
469 _ => None,
470 }
471 }
472}
473
474impl std::fmt::Display for IncomingError {
475 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476 match self {
477 IncomingError::Application(err) => err.fmt(f),
478 IncomingError::Timeout(err) => err.fmt(f),
479 IncomingError::Cancelled(err) => err.fmt(f),
480 IncomingError::Terminated(err) => err.fmt(f),
481 IncomingError::Server(err) => err.fmt(f),
482 IncomingError::ResetWorkflow(err) => err.fmt(f),
483 IncomingError::Activity(err) => err.fmt(f),
484 IncomingError::ChildWorkflowExecution(err) => err.fmt(f),
485 IncomingError::NexusOperationExecution(err) => err.fmt(f),
486 IncomingError::NexusHandler(err) => err.fmt(f),
487 }
488 }
489}
490
491impl std::error::Error for IncomingError {
492 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
493 match self {
494 IncomingError::Application(err) => Some(err),
495 IncomingError::Timeout(err) => Some(err),
496 IncomingError::Cancelled(err) => Some(err),
497 IncomingError::Terminated(err) => Some(err),
498 IncomingError::Server(err) => Some(err),
499 IncomingError::ResetWorkflow(err) => Some(err),
500 IncomingError::Activity(err) => Some(err),
501 IncomingError::ChildWorkflowExecution(err) => Some(err),
502 IncomingError::NexusOperationExecution(err) => Some(err),
503 IncomingError::NexusHandler(err) => Some(err),
504 }
505 }
506}
507
508macro_rules! impl_incoming_failure_wrapper {
509 ($name:ident) => {
510 impl $name {
511 pub fn failure(&self) -> &Failure {
513 &self.failure
514 }
515
516 pub fn cause(&self) -> Option<&IncomingError> {
518 self.cause.as_deref()
519 }
520
521 pub fn into_failure(self) -> Failure {
523 self.failure
524 }
525
526 pub fn into_parts(self) -> (Failure, Option<IncomingError>) {
528 (self.failure, self.cause.map(|cause| *cause))
529 }
530 }
531
532 impl std::fmt::Display for $name {
533 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534 self.failure.fmt(f)
535 }
536 }
537
538 impl std::error::Error for $name {
539 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
540 self.cause
541 .as_deref()
542 .map(|cause| cause as &(dyn std::error::Error + 'static))
543 }
544 }
545 };
546}
547
548macro_rules! incoming_failure_wrapper {
549 ($name:ident, $doc:literal) => {
550 #[doc = $doc]
551 #[derive(Debug)]
552 pub struct $name {
553 failure: Failure,
554 cause: Option<Box<IncomingError>>,
555 }
556
557 impl $name {
558 pub(crate) fn new(failure: Failure, cause: Option<IncomingError>) -> Self {
560 Self {
561 failure,
562 cause: cause.map(Box::new),
563 }
564 }
565 }
566
567 impl_incoming_failure_wrapper!($name);
568 };
569}
570
571#[derive(Debug)]
573pub struct TimeoutError {
574 failure: Failure,
575 cause: Option<Box<IncomingError>>,
576 timeout_type: TimeoutType,
577 last_heartbeat_details: Option<DecodablePayloads>,
578}
579
580impl TimeoutError {
581 pub(crate) fn new(
583 failure: Failure,
584 failure_info: crate::protos::temporal::api::failure::v1::TimeoutFailureInfo,
585 cause: Option<IncomingError>,
586 payload_converter: &PayloadConverter,
587 context: &SerializationContextData,
588 ) -> Self {
589 Self {
590 failure,
591 cause: cause.map(Box::new),
592 timeout_type: failure_info.timeout_type(),
593 last_heartbeat_details: failure_info.last_heartbeat_details.map(|details| {
594 DecodablePayloads::new(details.payloads, payload_converter.clone(), *context)
595 }),
596 }
597 }
598
599 pub fn timeout_type(&self) -> TimeoutType {
601 self.timeout_type
602 }
603
604 pub fn last_heartbeat_details<T: TemporalDeserializable + 'static>(
606 &self,
607 ) -> Result<Option<T>, PayloadConversionError> {
608 self.last_heartbeat_details
609 .as_ref()
610 .map(DecodablePayloads::deserialize)
611 .transpose()
612 }
613
614 pub fn raw_last_heartbeat_details(&self) -> Option<&[Payload]> {
616 self.last_heartbeat_details
617 .as_ref()
618 .map(DecodablePayloads::raw)
619 }
620}
621
622impl_incoming_failure_wrapper!(TimeoutError);
623
624#[derive(Debug)]
626pub struct CancelledError {
627 failure: Failure,
628 cause: Option<Box<IncomingError>>,
629 details: Option<DecodablePayloads>,
630}
631
632impl CancelledError {
633 pub(crate) fn new(
635 failure: Failure,
636 failure_info: crate::protos::temporal::api::failure::v1::CanceledFailureInfo,
637 cause: Option<IncomingError>,
638 payload_converter: &PayloadConverter,
639 context: &SerializationContextData,
640 ) -> Self {
641 Self {
642 failure,
643 cause: cause.map(Box::new),
644 details: failure_info.details.map(|details| {
645 DecodablePayloads::new(details.payloads, payload_converter.clone(), *context)
646 }),
647 }
648 }
649
650 pub fn details<T: TemporalDeserializable + 'static>(
653 &self,
654 ) -> Result<Option<T>, PayloadConversionError> {
655 self.details
656 .as_ref()
657 .map(DecodablePayloads::deserialize)
658 .transpose()
659 }
660
661 pub fn raw_details(&self) -> Option<&[Payload]> {
663 self.details.as_ref().map(DecodablePayloads::raw)
664 }
665}
666
667impl_incoming_failure_wrapper!(CancelledError);
668incoming_failure_wrapper!(TerminatedError, "A normalized terminated failure.");
669incoming_failure_wrapper!(ServerError, "A normalized server failure.");
670incoming_failure_wrapper!(ResetWorkflowError, "A normalized reset-workflow failure.");
671
672#[derive(Debug)]
674pub struct ActivityFailureError {
675 failure: Failure,
676 cause: Option<Box<IncomingError>>,
677 activity_id: String,
678 activity_type: Option<crate::protos::temporal::api::common::v1::ActivityType>,
679 scheduled_event_id: i64,
680 started_event_id: i64,
681 identity: String,
682 retry_state: crate::protos::temporal::api::enums::v1::RetryState,
683}
684
685impl ActivityFailureError {
686 pub(crate) fn new(
688 failure: Failure,
689 failure_info: crate::protos::temporal::api::failure::v1::ActivityFailureInfo,
690 cause: Option<IncomingError>,
691 ) -> Self {
692 let retry_state = failure_info.retry_state();
693 Self {
694 failure,
695 cause: cause.map(Box::new),
696 activity_id: failure_info.activity_id,
697 activity_type: failure_info.activity_type,
698 scheduled_event_id: failure_info.scheduled_event_id,
699 started_event_id: failure_info.started_event_id,
700 identity: failure_info.identity,
701 retry_state,
702 }
703 }
704
705 pub fn activity_id(&self) -> &str {
707 &self.activity_id
708 }
709
710 pub fn activity_type(&self) -> Option<&crate::protos::temporal::api::common::v1::ActivityType> {
712 self.activity_type.as_ref()
713 }
714
715 pub fn scheduled_event_id(&self) -> i64 {
717 self.scheduled_event_id
718 }
719
720 pub fn started_event_id(&self) -> i64 {
722 self.started_event_id
723 }
724
725 pub fn identity(&self) -> &str {
727 &self.identity
728 }
729
730 pub fn retry_state(&self) -> crate::protos::temporal::api::enums::v1::RetryState {
732 self.retry_state
733 }
734
735 pub fn as_timeout(&self) -> Option<&TimeoutError> {
738 self.cause().and_then(IncomingError::as_timeout)
739 }
740
741 pub fn as_cancelled(&self) -> Option<&CancelledError> {
744 self.cause().and_then(IncomingError::as_cancelled)
745 }
746}
747
748impl_incoming_failure_wrapper!(ActivityFailureError);
749#[derive(Debug)]
751pub struct ChildWorkflowFailureError {
752 failure: Failure,
753 cause: Option<Box<IncomingError>>,
754 namespace: String,
755 workflow_execution: Option<crate::protos::temporal::api::common::v1::WorkflowExecution>,
756 workflow_type: Option<crate::protos::temporal::api::common::v1::WorkflowType>,
757 initiated_event_id: i64,
758 started_event_id: i64,
759 retry_state: crate::protos::temporal::api::enums::v1::RetryState,
760}
761
762impl ChildWorkflowFailureError {
763 pub(crate) fn new(
765 failure: Failure,
766 failure_info: crate::protos::temporal::api::failure::v1::ChildWorkflowExecutionFailureInfo,
767 cause: Option<IncomingError>,
768 ) -> Self {
769 let retry_state = failure_info.retry_state();
770 Self {
771 failure,
772 cause: cause.map(Box::new),
773 namespace: failure_info.namespace,
774 workflow_execution: failure_info.workflow_execution,
775 workflow_type: failure_info.workflow_type,
776 initiated_event_id: failure_info.initiated_event_id,
777 started_event_id: failure_info.started_event_id,
778 retry_state,
779 }
780 }
781
782 pub fn namespace(&self) -> &str {
784 &self.namespace
785 }
786
787 pub fn workflow_execution(
789 &self,
790 ) -> Option<&crate::protos::temporal::api::common::v1::WorkflowExecution> {
791 self.workflow_execution.as_ref()
792 }
793
794 pub fn workflow_type(&self) -> Option<&crate::protos::temporal::api::common::v1::WorkflowType> {
796 self.workflow_type.as_ref()
797 }
798
799 pub fn initiated_event_id(&self) -> i64 {
801 self.initiated_event_id
802 }
803
804 pub fn started_event_id(&self) -> i64 {
806 self.started_event_id
807 }
808
809 pub fn retry_state(&self) -> crate::protos::temporal::api::enums::v1::RetryState {
811 self.retry_state
812 }
813
814 pub fn as_timeout(&self) -> Option<&TimeoutError> {
817 self.cause().and_then(IncomingError::as_timeout)
818 }
819
820 pub fn as_cancelled(&self) -> Option<&CancelledError> {
823 self.cause().and_then(IncomingError::as_cancelled)
824 }
825}
826
827impl_incoming_failure_wrapper!(ChildWorkflowFailureError);
828incoming_failure_wrapper!(
829 IncomingNexusOperationExecutionError,
830 "A normalized nexus operation failure wrapper."
831);
832incoming_failure_wrapper!(
833 IncomingNexusHandlerError,
834 "A normalized nexus handler failure wrapper."
835);
836
837#[derive(Debug, thiserror::Error)]
839pub enum ActivityExecutionError {
840 #[error("Activity failed: {}", .0.failure().message)]
842 Failed(#[source] ActivityFailureError),
843 #[error("Activity cancelled: {}", .0.failure().message)]
845 Cancelled(#[source] CancelledError),
846 #[error("Payload conversion failed: {0}")]
848 Serialization(#[from] PayloadConversionError),
849}
850
851impl ActivityExecutionError {
852 pub fn failure(&self) -> Option<&Failure> {
854 match self {
855 ActivityExecutionError::Failed(err) => Some(err.failure()),
856 ActivityExecutionError::Cancelled(err) => Some(err.failure()),
857 ActivityExecutionError::Serialization(_) => None,
858 }
859 }
860
861 pub fn cause(&self) -> Option<&IncomingError> {
863 match self {
864 ActivityExecutionError::Failed(err) => err.cause(),
865 ActivityExecutionError::Cancelled(err) => err.cause(),
866 ActivityExecutionError::Serialization(_) => None,
867 }
868 }
869
870 pub fn reason(&self) -> Option<&IncomingError> {
872 match self {
873 ActivityExecutionError::Failed(err) => err.cause(),
874 ActivityExecutionError::Cancelled(_) | ActivityExecutionError::Serialization(_) => None,
875 }
876 }
877
878 pub fn as_timeout(&self) -> Option<&TimeoutError> {
881 match self {
882 ActivityExecutionError::Failed(err) => err.as_timeout(),
883 ActivityExecutionError::Serialization(_) | ActivityExecutionError::Cancelled(_) => None,
884 }
885 }
886
887 pub fn as_cancelled(&self) -> Option<&CancelledError> {
890 match self {
891 ActivityExecutionError::Failed(err) => err.as_cancelled(),
892 ActivityExecutionError::Cancelled(err) => Some(err),
893 ActivityExecutionError::Serialization(_) => None,
894 }
895 }
896}
897
898#[derive(Debug, thiserror::Error)]
900pub enum ChildWorkflowStartError {
901 #[error("Child workflow start cancelled: {}", .0.failure().message)]
903 Cancelled(#[source] Box<CancelledError>),
904 #[error(
906 "Child workflow start failed: workflow_id={workflow_id}, workflow_type={workflow_type}, cause={cause:?}"
907 )]
908 StartFailed {
909 workflow_id: String,
911 workflow_type: String,
913 cause: StartChildWorkflowExecutionFailedCause,
915 },
916 #[error("Payload conversion failed: {0}")]
918 Serialization(#[from] PayloadConversionError),
919}
920
921impl ChildWorkflowStartError {
922 pub fn failure(&self) -> Option<&Failure> {
924 match self {
925 ChildWorkflowStartError::Cancelled(err) => Some(err.failure()),
926 ChildWorkflowStartError::StartFailed { .. }
927 | ChildWorkflowStartError::Serialization(_) => None,
928 }
929 }
930
931 pub fn cause(&self) -> Option<&IncomingError> {
933 match self {
934 ChildWorkflowStartError::Cancelled(err) => err.cause(),
935 ChildWorkflowStartError::StartFailed { .. }
936 | ChildWorkflowStartError::Serialization(_) => None,
937 }
938 }
939}
940
941#[derive(Debug, thiserror::Error)]
943pub enum ChildWorkflowExecutionError {
944 #[error("Child workflow failed: {}", .0.failure().message)]
946 Failed(#[source] Box<ChildWorkflowFailureError>),
947 #[error("Payload conversion failed: {0}")]
949 Serialization(#[from] PayloadConversionError),
950}
951
952impl ChildWorkflowExecutionError {
953 pub fn failure(&self) -> Option<&Failure> {
955 match self {
956 ChildWorkflowExecutionError::Failed(err) => Some(err.failure()),
957 ChildWorkflowExecutionError::Serialization(_) => None,
958 }
959 }
960
961 pub fn cause(&self) -> Option<&IncomingError> {
963 match self {
964 ChildWorkflowExecutionError::Failed(err) => err.cause(),
965 ChildWorkflowExecutionError::Serialization(_) => None,
966 }
967 }
968
969 pub fn reason(&self) -> Option<&IncomingError> {
971 match self {
972 ChildWorkflowExecutionError::Failed(err) => err.cause(),
973 ChildWorkflowExecutionError::Serialization(_) => None,
974 }
975 }
976
977 pub fn as_timeout(&self) -> Option<&TimeoutError> {
980 match self {
981 ChildWorkflowExecutionError::Failed(err) => err.as_timeout(),
982 ChildWorkflowExecutionError::Serialization(_) => None,
983 }
984 }
985
986 pub fn as_cancelled(&self) -> Option<&CancelledError> {
989 match self {
990 ChildWorkflowExecutionError::Failed(err) => err.as_cancelled(),
991 ChildWorkflowExecutionError::Serialization(_) => None,
992 }
993 }
994}
995
996#[derive(Debug, thiserror::Error)]
998pub enum ChildWorkflowSignalError {
999 #[error("Child workflow signal failed: {}", .0.failure().message)]
1001 Failed(#[source] Box<ChildWorkflowSignalFailureError>),
1002 #[error("Signal payload conversion failed: {0}")]
1004 Serialization(#[from] PayloadConversionError),
1005}
1006
1007impl ChildWorkflowSignalError {
1008 pub fn failure(&self) -> Option<&Failure> {
1010 match self {
1011 ChildWorkflowSignalError::Failed(err) => Some(err.failure()),
1012 ChildWorkflowSignalError::Serialization(_) => None,
1013 }
1014 }
1015
1016 pub fn cause(&self) -> Option<&IncomingError> {
1018 match self {
1019 ChildWorkflowSignalError::Failed(err) => err.cause(),
1020 ChildWorkflowSignalError::Serialization(_) => None,
1021 }
1022 }
1023
1024 pub fn reason(&self) -> Option<&IncomingError> {
1026 match self {
1027 ChildWorkflowSignalError::Failed(err) => Some(err.error()),
1028 ChildWorkflowSignalError::Serialization(_) => None,
1029 }
1030 }
1031}
1032
1033#[derive(Debug)]
1035pub struct ChildWorkflowSignalFailureError {
1036 failure: Failure,
1037 error: Box<IncomingError>,
1038}
1039
1040impl ChildWorkflowSignalFailureError {
1041 pub(crate) fn new(failure: Failure, error: IncomingError) -> Self {
1043 Self {
1044 failure,
1045 error: Box::new(error),
1046 }
1047 }
1048
1049 pub fn failure(&self) -> &Failure {
1051 &self.failure
1052 }
1053
1054 pub fn cause(&self) -> Option<&IncomingError> {
1056 self.error.cause()
1057 }
1058
1059 pub fn error(&self) -> &IncomingError {
1061 &self.error
1062 }
1063}
1064
1065impl std::fmt::Display for ChildWorkflowSignalFailureError {
1066 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1067 self.failure.fmt(f)
1068 }
1069}
1070
1071impl std::error::Error for ChildWorkflowSignalFailureError {
1072 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1073 self.cause()
1074 .map(|cause| cause as &(dyn std::error::Error + 'static))
1075 }
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080 use super::*;
1081 use crate::{
1082 data_converters::{
1083 DefaultFailureConverter, FailureConverter, GenericPayloadConverter, PayloadConverter,
1084 SerializationContext, SerializationContextData,
1085 },
1086 protos::temporal::api::{common::v1::Payload, failure::v1::failure::FailureInfo},
1087 };
1088
1089 struct AlwaysFailsSerialize;
1090
1091 impl serde::Serialize for AlwaysFailsSerialize {
1092 fn serialize<S: serde::Serializer>(&self, _serializer: S) -> Result<S::Ok, S::Error> {
1093 Err(serde::ser::Error::custom("serialize boom"))
1094 }
1095 }
1096
1097 #[test]
1098 fn constructors_set_retryability_defaults() {
1099 assert!(!ApplicationFailure::new(anyhow::anyhow!("retryable")).is_non_retryable());
1100 assert!(
1101 ApplicationFailure::non_retryable(anyhow::anyhow!("non-retryable")).is_non_retryable()
1102 );
1103 }
1104
1105 #[test]
1106 fn conversion_preserves_application_metadata() {
1107 let payloads = Payloads {
1108 payloads: vec![Payload {
1109 data: b"details".to_vec(),
1110 ..Default::default()
1111 }],
1112 };
1113 let failure = DefaultFailureConverter.to_failure(
1114 OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1115 ApplicationFailure::builder(anyhow::anyhow!("oops"))
1116 .type_name("MyType".to_owned())
1117 .non_retryable(true)
1118 .next_retry_delay(Duration::from_secs(3))
1119 .category(ApplicationErrorCategory::Benign)
1120 .details(RawValue::new(payloads.payloads.clone()))
1121 .build(),
1122 ))),
1123 &PayloadConverter::default(),
1124 &SerializationContextData::None,
1125 );
1126 let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1127 panic!("expected application failure info");
1128 };
1129 assert_eq!(failure.message, "oops");
1130 assert_eq!(info.r#type, "MyType");
1131 assert!(info.non_retryable);
1132 assert_eq!(info.details, Some(payloads));
1133 assert_eq!(info.category(), ApplicationErrorCategory::Benign);
1134 assert_eq!(info.next_retry_delay.unwrap().seconds, 3);
1135 }
1136
1137 #[test]
1138 fn builder_accepts_raw_payload_details() {
1139 let payload = Payload {
1140 data: b"details".to_vec(),
1141 ..Default::default()
1142 };
1143 let failure = DefaultFailureConverter.to_failure(
1144 OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1145 ApplicationFailure::builder(anyhow::anyhow!("oops"))
1146 .details(RawValue::new(vec![payload.clone()]))
1147 .build(),
1148 ))),
1149 &PayloadConverter::default(),
1150 &SerializationContextData::None,
1151 );
1152
1153 let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1154 panic!("expected application failure info");
1155 };
1156 assert_eq!(info.details.unwrap().payloads, vec![payload]);
1157 }
1158
1159 #[test]
1160 fn builder_accepts_serializable_details() {
1161 let failure = DefaultFailureConverter.to_failure(
1162 OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1163 ApplicationFailure::builder(anyhow::anyhow!("oops"))
1164 .details("details".to_string())
1165 .build(),
1166 ))),
1167 &PayloadConverter::default(),
1168 &SerializationContextData::None,
1169 );
1170
1171 let Some(FailureInfo::ApplicationFailureInfo(info)) = failure.failure_info else {
1172 panic!("expected application failure info");
1173 };
1174 let payloads = info.details.expect("expected details").payloads;
1175 let converter = PayloadConverter::default();
1176 let details: String = converter
1177 .from_payloads(
1178 &SerializationContext {
1179 data: &SerializationContextData::None,
1180 converter: &converter,
1181 },
1182 payloads,
1183 )
1184 .unwrap();
1185 assert_eq!(details, "details");
1186 }
1187
1188 #[test]
1189 fn application_failure_encoding_surfaces_detail_encoding_errors() {
1190 let failure = DefaultFailureConverter.to_failure(
1191 OutgoingError::Workflow(OutgoingWorkflowError::Application(Box::new(
1192 ApplicationFailure::builder(anyhow::anyhow!("oops"))
1193 .details(AlwaysFailsSerialize)
1194 .build(),
1195 ))),
1196 &PayloadConverter::default(),
1197 &SerializationContextData::None,
1198 );
1199
1200 assert_eq!(
1201 failure.message,
1202 "Failed converting error to failure: Encoding error: serialize boom, original error message: oops"
1203 );
1204 assert!(matches!(
1205 failure.failure_info,
1206 Some(FailureInfo::ApplicationFailureInfo(_))
1207 ));
1208 }
1209
1210 #[test]
1211 fn anyhow_workflow_errors_default_to_application_outgoing_errors() {
1212 let outgoing: OutgoingWorkflowError = anyhow::anyhow!("workflow boom").into();
1213
1214 let OutgoingWorkflowError::Application(app) = outgoing else {
1215 panic!("plain workflow errors should default to application failures");
1216 };
1217 assert_eq!(app.to_string(), "workflow boom");
1218 }
1219
1220 #[test]
1221 fn payload_conversion_errors_default_to_application_outgoing_errors() {
1222 let outgoing: OutgoingWorkflowError =
1223 PayloadConversionError::EncodingError(anyhow::anyhow!("encode boom").into()).into();
1224
1225 let OutgoingWorkflowError::Application(app) = outgoing else {
1226 panic!("payload conversion errors should default to application failures");
1227 };
1228 assert_eq!(app.to_string(), "Encoding error: encode boom");
1229 }
1230}