Skip to main content

temporalio_common/protos/
mod.rs

1//! Contains the protobuf definitions used as arguments to and return values from interactions with
2//! the Temporal Core SDK. Language SDK authors can generate structs using the proto definitions
3//! that will match the generated structs in this module.
4
5pub mod constants;
6/// Utility functions for working with protobuf types.
7pub mod utilities;
8
9#[cfg(feature = "test-utilities")]
10/// Pre-built test histories for common workflow patterns.
11pub mod canned_histories;
12#[cfg(feature = "history_builders")]
13mod history_builder;
14#[cfg(feature = "history_builders")]
15mod history_info;
16mod task_token;
17#[cfg(feature = "test-utilities")]
18pub mod test_utils;
19
20use std::time::Duration;
21
22#[cfg(feature = "history_builders")]
23pub use history_builder::{
24    DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE, TestHistoryBuilder, default_act_sched,
25    default_wes_attribs,
26};
27#[cfg(feature = "history_builders")]
28pub use history_info::HistoryInfo;
29pub use task_token::TaskToken;
30
31/// Payload metadata key that identifies the encoding format.
32pub static ENCODING_PAYLOAD_KEY: &str = "encoding";
33/// The metadata value for JSON-encoded payloads.
34pub static JSON_ENCODING_VAL: &str = "json/plain";
35/// The details key used in patched marker payloads.
36pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data";
37/// The search attribute key used when registering change versions
38pub static VERSION_SEARCH_ATTR_KEY: &str = "TemporalChangeVersion";
39
40macro_rules! include_proto_with_serde {
41    ($pkg:tt) => {
42        tonic::include_proto!($pkg);
43
44        include!(concat!(env!("OUT_DIR"), concat!("/", $pkg, ".serde.rs")));
45    };
46}
47
48#[allow(
49    clippy::large_enum_variant,
50    clippy::derive_partial_eq_without_eq,
51    clippy::reserve_after_initialization
52)]
53// I'd prefer not to do this, but there are some generated things that just don't need it.
54#[allow(missing_docs)]
55pub mod coresdk {
56    //! Contains all protobufs relating to communication between core and lang-specific SDKs
57
58    tonic::include_proto!("coresdk");
59
60    use crate::protos::{
61        ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL,
62        temporal::api::{
63            common::v1::{Payload, Payloads, RetryPolicy, WorkflowExecution},
64            enums::v1::{
65                ApplicationErrorCategory, TimeoutType, VersioningBehavior, WorkflowTaskFailedCause,
66            },
67            failure::v1::{
68                ActivityFailureInfo, ApplicationFailureInfo, Failure, TimeoutFailureInfo,
69                failure::FailureInfo,
70            },
71            workflowservice::v1::PollActivityTaskQueueResponse,
72        },
73    };
74    use activity_task::ActivityTask;
75    use serde::{Deserialize, Serialize};
76    use std::{
77        collections::HashMap,
78        convert::TryFrom,
79        fmt::{Display, Formatter},
80        iter::FromIterator,
81    };
82    use workflow_activation::{WorkflowActivationJob, workflow_activation_job};
83    use workflow_commands::{WorkflowCommand, workflow_command, workflow_command::Variant};
84    use workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion};
85
86    #[allow(clippy::module_inception)]
87    pub mod activity_task {
88        use crate::protos::{coresdk::ActivityTaskCompletion, task_token::format_task_token};
89        use std::fmt::{Display, Formatter};
90        tonic::include_proto!("coresdk.activity_task");
91
92        impl ActivityTask {
93            pub fn cancel_from_ids(
94                task_token: Vec<u8>,
95                reason: ActivityCancelReason,
96                details: ActivityCancellationDetails,
97            ) -> Self {
98                Self {
99                    task_token,
100                    variant: Some(activity_task::Variant::Cancel(Cancel {
101                        reason: reason as i32,
102                        details: Some(details),
103                    })),
104                }
105            }
106
107            // Checks if both the primary reason or details have a timeout cancellation.
108            pub fn is_timeout(&self) -> bool {
109                match &self.variant {
110                    Some(activity_task::Variant::Cancel(Cancel { reason, details })) => {
111                        *reason == ActivityCancelReason::TimedOut as i32
112                            || details.as_ref().is_some_and(|d| d.is_timed_out)
113                    }
114                    _ => false,
115                }
116            }
117
118            pub fn primary_reason_to_cancellation_details(
119                reason: ActivityCancelReason,
120            ) -> ActivityCancellationDetails {
121                ActivityCancellationDetails {
122                    is_not_found: reason == ActivityCancelReason::NotFound,
123                    is_cancelled: reason == ActivityCancelReason::Cancelled,
124                    is_paused: reason == ActivityCancelReason::Paused,
125                    is_timed_out: reason == ActivityCancelReason::TimedOut,
126                    is_worker_shutdown: reason == ActivityCancelReason::WorkerShutdown,
127                    is_reset: reason == ActivityCancelReason::Reset,
128                }
129            }
130        }
131
132        impl Display for ActivityTaskCompletion {
133            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134                write!(
135                    f,
136                    "ActivityTaskCompletion(token: {}",
137                    format_task_token(&self.task_token),
138                )?;
139                if let Some(r) = self.result.as_ref().and_then(|r| r.status.as_ref()) {
140                    write!(f, ", {r}")?;
141                } else {
142                    write!(f, ", missing result")?;
143                }
144                write!(f, ")")
145            }
146        }
147    }
148    #[allow(clippy::module_inception)]
149    pub mod activity_result {
150        tonic::include_proto!("coresdk.activity_result");
151        use super::super::temporal::api::{
152            common::v1::Payload,
153            failure::v1::{CanceledFailureInfo, Failure as APIFailure, failure},
154        };
155        use crate::protos::{
156            coresdk::activity_result::activity_resolution::Status,
157            temporal::api::enums::v1::TimeoutType,
158        };
159        use activity_execution_result as aer;
160        use anyhow::anyhow;
161        use std::fmt::{Display, Formatter};
162
163        impl ActivityExecutionResult {
164            pub const fn ok(result: Payload) -> Self {
165                Self {
166                    status: Some(aer::Status::Completed(Success {
167                        result: Some(result),
168                    })),
169                }
170            }
171
172            pub fn fail(fail: APIFailure) -> Self {
173                Self {
174                    status: Some(aer::Status::Failed(Failure {
175                        failure: Some(fail),
176                    })),
177                }
178            }
179
180            pub fn cancel_from_details(payload: Option<Payload>) -> Self {
181                Self {
182                    status: Some(aer::Status::Cancelled(Cancellation::from_details(payload))),
183                }
184            }
185
186            pub const fn will_complete_async() -> Self {
187                Self {
188                    status: Some(aer::Status::WillCompleteAsync(WillCompleteAsync {})),
189                }
190            }
191
192            pub fn is_cancelled(&self) -> bool {
193                matches!(self.status, Some(aer::Status::Cancelled(_)))
194            }
195        }
196
197        impl Display for aer::Status {
198            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199                write!(f, "ActivityExecutionResult(")?;
200                match self {
201                    aer::Status::Completed(v) => {
202                        write!(f, "{v})")
203                    }
204                    aer::Status::Failed(v) => {
205                        write!(f, "{v})")
206                    }
207                    aer::Status::Cancelled(v) => {
208                        write!(f, "{v})")
209                    }
210                    aer::Status::WillCompleteAsync(_) => {
211                        write!(f, "Will complete async)")
212                    }
213                }
214            }
215        }
216
217        impl Display for Success {
218            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
219                write!(f, "Success(")?;
220                if let Some(ref v) = self.result {
221                    write!(f, "{v}")?;
222                }
223                write!(f, ")")
224            }
225        }
226
227        impl Display for Failure {
228            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
229                write!(f, "Failure(")?;
230                if let Some(ref v) = self.failure {
231                    write!(f, "{v}")?;
232                }
233                write!(f, ")")
234            }
235        }
236
237        impl Display for Cancellation {
238            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
239                write!(f, "Cancellation(")?;
240                if let Some(ref v) = self.failure {
241                    write!(f, "{v}")?;
242                }
243                write!(f, ")")
244            }
245        }
246
247        impl From<Result<Payload, APIFailure>> for ActivityExecutionResult {
248            fn from(r: Result<Payload, APIFailure>) -> Self {
249                Self {
250                    status: match r {
251                        Ok(p) => Some(aer::Status::Completed(Success { result: Some(p) })),
252                        Err(f) => Some(aer::Status::Failed(Failure { failure: Some(f) })),
253                    },
254                }
255            }
256        }
257
258        impl ActivityResolution {
259            /// Extract an activity's payload if it completed successfully, or return an error for all
260            /// other outcomes.
261            pub fn success_payload_or_error(self) -> Result<Option<Payload>, anyhow::Error> {
262                let Some(status) = self.status else {
263                    return Err(anyhow!("Activity completed without a status"));
264                };
265
266                match status {
267                    activity_resolution::Status::Completed(success) => Ok(success.result),
268                    e => Err(anyhow!("Activity was not successful: {e:?}")),
269                }
270            }
271
272            pub fn unwrap_ok_payload(self) -> Payload {
273                self.success_payload_or_error().unwrap().unwrap()
274            }
275
276            pub fn completed_ok(&self) -> bool {
277                matches!(self.status, Some(activity_resolution::Status::Completed(_)))
278            }
279
280            pub fn failed(&self) -> bool {
281                matches!(self.status, Some(activity_resolution::Status::Failed(_)))
282            }
283
284            pub fn timed_out(&self) -> Option<TimeoutType> {
285                match self.status {
286                    Some(activity_resolution::Status::Failed(Failure {
287                        failure: Some(ref f),
288                    })) => f.is_timeout(),
289                    _ => None,
290                }
291            }
292
293            pub fn cancelled(&self) -> bool {
294                matches!(self.status, Some(activity_resolution::Status::Cancelled(_)))
295            }
296
297            /// If this resolution is any kind of failure, return the inner failure details. Panics
298            /// if the activity succeeded, is in backoff, or this resolution is malformed.
299            pub fn unwrap_failure(self) -> APIFailure {
300                match self.status.unwrap() {
301                    Status::Failed(f) => f.failure.unwrap(),
302                    Status::Cancelled(c) => c.failure.unwrap(),
303                    _ => panic!("Actvity did not fail"),
304                }
305            }
306        }
307
308        impl Cancellation {
309            /// Create a cancellation result from some payload. This is to be used when telling Core
310            /// that an activity completed as cancelled.
311            pub fn from_details(details: Option<Payload>) -> Self {
312                Cancellation {
313                    failure: Some(APIFailure {
314                        message: "Activity cancelled".to_string(),
315                        failure_info: Some(failure::FailureInfo::CanceledFailureInfo(
316                            CanceledFailureInfo {
317                                details: details.map(Into::into),
318                            },
319                        )),
320                        ..Default::default()
321                    }),
322                }
323            }
324        }
325    }
326
327    pub mod common {
328        tonic::include_proto!("coresdk.common");
329        use super::external_data::LocalActivityMarkerData;
330        use crate::protos::{
331            PATCHED_MARKER_DETAILS_KEY,
332            coresdk::{
333                AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt,
334                external_data::PatchedMarkerData,
335            },
336            temporal::api::common::v1::{Payload, Payloads},
337        };
338        use std::collections::HashMap;
339
340        pub fn build_has_change_marker_details(
341            patch_id: impl Into<String>,
342            deprecated: bool,
343        ) -> anyhow::Result<HashMap<String, Payloads>> {
344            let mut hm = HashMap::new();
345            let encoded = PatchedMarkerData {
346                id: patch_id.into(),
347                deprecated,
348            }
349            .as_json_payload()?;
350            hm.insert(PATCHED_MARKER_DETAILS_KEY.to_string(), encoded.into());
351            Ok(hm)
352        }
353
354        pub fn decode_change_marker_details(
355            details: &HashMap<String, Payloads>,
356        ) -> Option<(String, bool)> {
357            // We used to write change markers with plain bytes, so try to decode if they are
358            // json first, then fall back to that.
359            if let Some(cd) = details.get(PATCHED_MARKER_DETAILS_KEY) {
360                let decoded = PatchedMarkerData::from_json_payload(cd.payloads.first()?).ok()?;
361                return Some((decoded.id, decoded.deprecated));
362            }
363
364            let id_entry = details.get("patch_id")?.payloads.first()?;
365            let deprecated_entry = details.get("deprecated")?.payloads.first()?;
366            let name = std::str::from_utf8(&id_entry.data).ok()?;
367            let deprecated = *deprecated_entry.data.first()? != 0;
368            Some((name.to_string(), deprecated))
369        }
370
371        pub fn build_local_activity_marker_details(
372            metadata: LocalActivityMarkerData,
373            result: Option<Payload>,
374        ) -> HashMap<String, Payloads> {
375            let mut hm = HashMap::new();
376            // It would be more efficient for this to be proto binary, but then it shows up as
377            // meaningless in the Temporal UI...
378            if let Some(jsonified) = metadata.as_json_payload().into_payloads() {
379                hm.insert("data".to_string(), jsonified);
380            }
381            if let Some(res) = result {
382                hm.insert("result".to_string(), res.into());
383            }
384            hm
385        }
386
387        /// Given a marker detail map, returns just the local activity info, but not the payload.
388        /// This is fairly inexpensive. Deserializing the whole payload may not be.
389        pub fn extract_local_activity_marker_data(
390            details: &HashMap<String, Payloads>,
391        ) -> Option<LocalActivityMarkerData> {
392            details
393                .get("data")
394                .and_then(|p| p.payloads.first())
395                .and_then(|p| std::str::from_utf8(&p.data).ok())
396                .and_then(|s| serde_json::from_str(s).ok())
397        }
398
399        /// Given a marker detail map, returns the local activity info and the result payload
400        /// if they are found and the marker data is well-formed. This removes the data from the
401        /// map.
402        pub fn extract_local_activity_marker_details(
403            details: &mut HashMap<String, Payloads>,
404        ) -> (Option<LocalActivityMarkerData>, Option<Payload>) {
405            let data = extract_local_activity_marker_data(details);
406            let result = details.remove("result").and_then(|mut p| p.payloads.pop());
407            (data, result)
408        }
409    }
410
411    pub mod external_data {
412        use prost_types::{Duration, Timestamp};
413        use serde::{Deserialize, Deserializer, Serialize, Serializer};
414        tonic::include_proto!("coresdk.external_data");
415
416        // Buncha hullaballoo because prost types aren't serde compat.
417        // See https://github.com/tokio-rs/prost/issues/75 which hilariously Chad opened ages ago
418
419        #[derive(Serialize, Deserialize)]
420        #[serde(remote = "Timestamp")]
421        struct TimestampDef {
422            seconds: i64,
423            nanos: i32,
424        }
425        mod opt_timestamp {
426            use super::*;
427
428            pub(super) fn serialize<S>(
429                value: &Option<Timestamp>,
430                serializer: S,
431            ) -> Result<S::Ok, S::Error>
432            where
433                S: Serializer,
434            {
435                #[derive(Serialize)]
436                struct Helper<'a>(#[serde(with = "TimestampDef")] &'a Timestamp);
437
438                value.as_ref().map(Helper).serialize(serializer)
439            }
440
441            pub(super) fn deserialize<'de, D>(
442                deserializer: D,
443            ) -> Result<Option<Timestamp>, D::Error>
444            where
445                D: Deserializer<'de>,
446            {
447                #[derive(Deserialize)]
448                struct Helper(#[serde(with = "TimestampDef")] Timestamp);
449
450                let helper = Option::deserialize(deserializer)?;
451                Ok(helper.map(|Helper(external)| external))
452            }
453        }
454
455        // Luckily Duration is also stored the exact same way
456        #[derive(Serialize, Deserialize)]
457        #[serde(remote = "Duration")]
458        struct DurationDef {
459            seconds: i64,
460            nanos: i32,
461        }
462        mod opt_duration {
463            use super::*;
464
465            pub(super) fn serialize<S>(
466                value: &Option<Duration>,
467                serializer: S,
468            ) -> Result<S::Ok, S::Error>
469            where
470                S: Serializer,
471            {
472                #[derive(Serialize)]
473                struct Helper<'a>(#[serde(with = "DurationDef")] &'a Duration);
474
475                value.as_ref().map(Helper).serialize(serializer)
476            }
477
478            pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
479            where
480                D: Deserializer<'de>,
481            {
482                #[derive(Deserialize)]
483                struct Helper(#[serde(with = "DurationDef")] Duration);
484
485                let helper = Option::deserialize(deserializer)?;
486                Ok(helper.map(|Helper(external)| external))
487            }
488        }
489    }
490
491    pub mod workflow_activation {
492        use crate::protos::{
493            coresdk::{
494                FromPayloadsExt,
495                activity_result::{ActivityResolution, activity_resolution},
496                common::NamespacedWorkflowExecution,
497                fix_retry_policy,
498                workflow_activation::remove_from_cache::EvictionReason,
499            },
500            temporal::api::{
501                enums::v1::WorkflowTaskFailedCause,
502                history::v1::{
503                    WorkflowExecutionCancelRequestedEventAttributes,
504                    WorkflowExecutionSignaledEventAttributes,
505                    WorkflowExecutionStartedEventAttributes,
506                },
507                query::v1::WorkflowQuery,
508            },
509        };
510        use prost_types::Timestamp;
511        use std::fmt::{Display, Formatter};
512
513        tonic::include_proto!("coresdk.workflow_activation");
514
515        pub fn create_evict_activation(
516            run_id: String,
517            message: String,
518            reason: EvictionReason,
519        ) -> WorkflowActivation {
520            WorkflowActivation {
521                timestamp: None,
522                run_id,
523                is_replaying: false,
524                history_length: 0,
525                jobs: vec![WorkflowActivationJob::from(
526                    workflow_activation_job::Variant::RemoveFromCache(RemoveFromCache {
527                        message,
528                        reason: reason as i32,
529                    }),
530                )],
531                available_internal_flags: vec![],
532                history_size_bytes: 0,
533                continue_as_new_suggested: false,
534                deployment_version_for_current_task: None,
535                last_sdk_version: String::new(),
536                suggest_continue_as_new_reasons: vec![],
537            }
538        }
539
540        pub fn query_to_job(id: String, q: WorkflowQuery) -> QueryWorkflow {
541            QueryWorkflow {
542                query_id: id,
543                query_type: q.query_type,
544                arguments: Vec::from_payloads(q.query_args),
545                headers: q.header.map(|h| h.into()).unwrap_or_default(),
546            }
547        }
548
549        impl WorkflowActivation {
550            /// Returns true if the only job in the activation is eviction
551            pub fn is_only_eviction(&self) -> bool {
552                matches!(
553                    self.jobs.as_slice(),
554                    [WorkflowActivationJob {
555                        variant: Some(workflow_activation_job::Variant::RemoveFromCache(_))
556                    }]
557                )
558            }
559
560            /// Returns eviction reason if this activation is an eviction
561            pub fn eviction_reason(&self) -> Option<EvictionReason> {
562                self.jobs.iter().find_map(|j| {
563                    if let Some(workflow_activation_job::Variant::RemoveFromCache(ref rj)) =
564                        j.variant
565                    {
566                        EvictionReason::try_from(rj.reason).ok()
567                    } else {
568                        None
569                    }
570                })
571            }
572        }
573
574        impl workflow_activation_job::Variant {
575            pub fn is_local_activity_resolution(&self) -> bool {
576                matches!(self, workflow_activation_job::Variant::ResolveActivity(ra) if ra.is_local)
577            }
578        }
579
580        impl Display for EvictionReason {
581            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582                write!(f, "{self:?}")
583            }
584        }
585
586        impl From<EvictionReason> for WorkflowTaskFailedCause {
587            fn from(value: EvictionReason) -> Self {
588                match value {
589                    EvictionReason::Nondeterminism => {
590                        WorkflowTaskFailedCause::NonDeterministicError
591                    }
592                    _ => WorkflowTaskFailedCause::Unspecified,
593                }
594            }
595        }
596
597        impl Display for WorkflowActivation {
598            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
599                write!(f, "WorkflowActivation(")?;
600                write!(f, "run_id: {}, ", self.run_id)?;
601                write!(f, "is_replaying: {}, ", self.is_replaying)?;
602                write!(
603                    f,
604                    "jobs: {})",
605                    self.jobs
606                        .iter()
607                        .map(ToString::to_string)
608                        .collect::<Vec<_>>()
609                        .as_slice()
610                        .join(", ")
611                )
612            }
613        }
614
615        impl Display for WorkflowActivationJob {
616            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
617                match &self.variant {
618                    None => write!(f, "empty"),
619                    Some(v) => write!(f, "{v}"),
620                }
621            }
622        }
623
624        impl Display for workflow_activation_job::Variant {
625            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
626                match self {
627                    workflow_activation_job::Variant::InitializeWorkflow(_) => {
628                        write!(f, "InitializeWorkflow")
629                    }
630                    workflow_activation_job::Variant::FireTimer(t) => {
631                        write!(f, "FireTimer({})", t.seq)
632                    }
633                    workflow_activation_job::Variant::UpdateRandomSeed(_) => {
634                        write!(f, "UpdateRandomSeed")
635                    }
636                    workflow_activation_job::Variant::QueryWorkflow(_) => {
637                        write!(f, "QueryWorkflow")
638                    }
639                    workflow_activation_job::Variant::CancelWorkflow(_) => {
640                        write!(f, "CancelWorkflow")
641                    }
642                    workflow_activation_job::Variant::SignalWorkflow(_) => {
643                        write!(f, "SignalWorkflow")
644                    }
645                    workflow_activation_job::Variant::ResolveActivity(r) => {
646                        write!(
647                            f,
648                            "ResolveActivity({}, {})",
649                            r.seq,
650                            r.result
651                                .as_ref()
652                                .unwrap_or(&ActivityResolution { status: None })
653                        )
654                    }
655                    workflow_activation_job::Variant::NotifyHasPatch(_) => {
656                        write!(f, "NotifyHasPatch")
657                    }
658                    workflow_activation_job::Variant::ResolveChildWorkflowExecutionStart(_) => {
659                        write!(f, "ResolveChildWorkflowExecutionStart")
660                    }
661                    workflow_activation_job::Variant::ResolveChildWorkflowExecution(_) => {
662                        write!(f, "ResolveChildWorkflowExecution")
663                    }
664                    workflow_activation_job::Variant::ResolveSignalExternalWorkflow(_) => {
665                        write!(f, "ResolveSignalExternalWorkflow")
666                    }
667                    workflow_activation_job::Variant::RemoveFromCache(_) => {
668                        write!(f, "RemoveFromCache")
669                    }
670                    workflow_activation_job::Variant::ResolveRequestCancelExternalWorkflow(_) => {
671                        write!(f, "ResolveRequestCancelExternalWorkflow")
672                    }
673                    workflow_activation_job::Variant::DoUpdate(u) => {
674                        write!(f, "DoUpdate({})", u.id)
675                    }
676                    workflow_activation_job::Variant::ResolveNexusOperationStart(_) => {
677                        write!(f, "ResolveNexusOperationStart")
678                    }
679                    workflow_activation_job::Variant::ResolveNexusOperation(_) => {
680                        write!(f, "ResolveNexusOperation")
681                    }
682                }
683            }
684        }
685
686        impl Display for ActivityResolution {
687            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
688                match self.status {
689                    None => {
690                        write!(f, "None")
691                    }
692                    Some(activity_resolution::Status::Failed(_)) => {
693                        write!(f, "Failed")
694                    }
695                    Some(activity_resolution::Status::Completed(_)) => {
696                        write!(f, "Completed")
697                    }
698                    Some(activity_resolution::Status::Cancelled(_)) => {
699                        write!(f, "Cancelled")
700                    }
701                    Some(activity_resolution::Status::Backoff(_)) => {
702                        write!(f, "Backoff")
703                    }
704                }
705            }
706        }
707
708        impl Display for QueryWorkflow {
709            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
710                write!(
711                    f,
712                    "QueryWorkflow(id: {}, type: {})",
713                    self.query_id, self.query_type
714                )
715            }
716        }
717
718        impl From<WorkflowExecutionSignaledEventAttributes> for SignalWorkflow {
719            fn from(a: WorkflowExecutionSignaledEventAttributes) -> Self {
720                Self {
721                    signal_name: a.signal_name,
722                    input: Vec::from_payloads(a.input),
723                    identity: a.identity,
724                    headers: a.header.map(Into::into).unwrap_or_default(),
725                }
726            }
727        }
728
729        impl From<WorkflowExecutionCancelRequestedEventAttributes> for CancelWorkflow {
730            fn from(a: WorkflowExecutionCancelRequestedEventAttributes) -> Self {
731                Self { reason: a.cause }
732            }
733        }
734
735        /// Create a [InitializeWorkflow] job from corresponding event attributes
736        pub fn start_workflow_from_attribs(
737            attrs: WorkflowExecutionStartedEventAttributes,
738            workflow_id: String,
739            randomness_seed: u64,
740            start_time: Timestamp,
741        ) -> InitializeWorkflow {
742            InitializeWorkflow {
743                workflow_type: attrs.workflow_type.map(|wt| wt.name).unwrap_or_default(),
744                workflow_id,
745                arguments: Vec::from_payloads(attrs.input),
746                randomness_seed,
747                headers: attrs.header.unwrap_or_default().fields,
748                identity: attrs.identity,
749                parent_workflow_info: attrs.parent_workflow_execution.map(|pe| {
750                    NamespacedWorkflowExecution {
751                        namespace: attrs.parent_workflow_namespace,
752                        run_id: pe.run_id,
753                        workflow_id: pe.workflow_id,
754                    }
755                }),
756                workflow_execution_timeout: attrs.workflow_execution_timeout,
757                workflow_run_timeout: attrs.workflow_run_timeout,
758                workflow_task_timeout: attrs.workflow_task_timeout,
759                continued_from_execution_run_id: attrs.continued_execution_run_id,
760                continued_initiator: attrs.initiator,
761                continued_failure: attrs.continued_failure,
762                last_completion_result: attrs.last_completion_result,
763                first_execution_run_id: attrs.first_execution_run_id,
764                retry_policy: attrs.retry_policy.map(fix_retry_policy),
765                attempt: attrs.attempt,
766                cron_schedule: attrs.cron_schedule,
767                workflow_execution_expiration_time: attrs.workflow_execution_expiration_time,
768                cron_schedule_to_schedule_interval: attrs.first_workflow_task_backoff,
769                memo: attrs.memo,
770                search_attributes: attrs.search_attributes,
771                start_time: Some(start_time),
772                root_workflow: attrs.root_workflow_execution,
773                priority: attrs.priority,
774            }
775        }
776    }
777
778    pub mod workflow_completion {
779        use crate::protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure};
780        tonic::include_proto!("coresdk.workflow_completion");
781
782        impl workflow_activation_completion::Status {
783            pub const fn is_success(&self) -> bool {
784                match &self {
785                    Self::Successful(_) => true,
786                    Self::Failed(_) => false,
787                }
788            }
789        }
790
791        impl From<failure::v1::Failure> for Failure {
792            fn from(f: failure::v1::Failure) -> Self {
793                Failure {
794                    failure: Some(f),
795                    force_cause: WorkflowTaskFailedCause::Unspecified as i32,
796                }
797            }
798        }
799    }
800
801    pub mod child_workflow {
802        tonic::include_proto!("coresdk.child_workflow");
803    }
804
805    pub mod nexus {
806        use crate::protos::temporal::api::workflowservice::v1::PollNexusTaskQueueResponse;
807        use std::fmt::{Display, Formatter};
808
809        tonic::include_proto!("coresdk.nexus");
810
811        impl NexusTask {
812            /// Unwrap the inner server-delivered nexus task if that's what this is, else panic.
813            pub fn unwrap_task(self) -> PollNexusTaskQueueResponse {
814                if let Some(nexus_task::Variant::Task(t)) = self.variant {
815                    return t;
816                }
817                panic!("Nexus task did not contain a server task");
818            }
819
820            /// Get the task token
821            pub fn task_token(&self) -> &[u8] {
822                match &self.variant {
823                    Some(nexus_task::Variant::Task(t)) => t.task_token.as_slice(),
824                    Some(nexus_task::Variant::CancelTask(c)) => c.task_token.as_slice(),
825                    None => panic!("Nexus task did not contain a task token"),
826                }
827            }
828        }
829
830        impl Display for nexus_task_completion::Status {
831            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
832                write!(f, "NexusTaskCompletion(")?;
833                match self {
834                    nexus_task_completion::Status::Completed(c) => {
835                        write!(f, "{c}")
836                    }
837                    nexus_task_completion::Status::AckCancel(_) => {
838                        write!(f, "AckCancel")
839                    }
840                    #[allow(deprecated)]
841                    nexus_task_completion::Status::Error(error) => {
842                        write!(f, "Error({error:?})")
843                    }
844                    nexus_task_completion::Status::Failure(failure) => {
845                        write!(f, "{failure}")
846                    }
847                }?;
848                write!(f, ")")
849            }
850        }
851
852        #[derive(Debug, Clone, Copy, PartialEq, Eq)]
853        pub enum NexusOperationErrorState {
854            Failed,
855            Canceled,
856        }
857
858        impl Display for NexusOperationErrorState {
859            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
860                match self {
861                    Self::Failed => write!(f, "failed"),
862                    Self::Canceled => write!(f, "canceled"),
863                }
864            }
865        }
866    }
867
868    pub mod workflow_commands {
869        tonic::include_proto!("coresdk.workflow_commands");
870
871        use crate::protos::temporal::api::{common::v1::Payloads, enums::v1::QueryResultType};
872        use std::fmt::{Display, Formatter};
873
874        impl Display for WorkflowCommand {
875            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
876                match &self.variant {
877                    None => write!(f, "Empty"),
878                    Some(v) => write!(f, "{v}"),
879                }
880            }
881        }
882
883        impl Display for StartTimer {
884            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
885                write!(f, "StartTimer({})", self.seq)
886            }
887        }
888
889        impl Display for ScheduleActivity {
890            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
891                write!(f, "ScheduleActivity({}, {})", self.seq, self.activity_type)
892            }
893        }
894
895        impl Display for ScheduleLocalActivity {
896            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
897                write!(
898                    f,
899                    "ScheduleLocalActivity({}, {})",
900                    self.seq, self.activity_type
901                )
902            }
903        }
904
905        impl Display for QueryResult {
906            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
907                write!(f, "RespondToQuery({})", self.query_id)
908            }
909        }
910
911        impl Display for RequestCancelActivity {
912            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
913                write!(f, "RequestCancelActivity({})", self.seq)
914            }
915        }
916
917        impl Display for RequestCancelLocalActivity {
918            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
919                write!(f, "RequestCancelLocalActivity({})", self.seq)
920            }
921        }
922
923        impl Display for CancelTimer {
924            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
925                write!(f, "CancelTimer({})", self.seq)
926            }
927        }
928
929        impl Display for CompleteWorkflowExecution {
930            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
931                write!(f, "CompleteWorkflowExecution")
932            }
933        }
934
935        impl Display for FailWorkflowExecution {
936            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
937                write!(f, "FailWorkflowExecution")
938            }
939        }
940
941        impl Display for ContinueAsNewWorkflowExecution {
942            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
943                write!(f, "ContinueAsNewWorkflowExecution")
944            }
945        }
946
947        impl Display for CancelWorkflowExecution {
948            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
949                write!(f, "CancelWorkflowExecution")
950            }
951        }
952
953        impl Display for SetPatchMarker {
954            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
955                write!(f, "SetPatchMarker({})", self.patch_id)
956            }
957        }
958
959        impl Display for StartChildWorkflowExecution {
960            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
961                write!(
962                    f,
963                    "StartChildWorkflowExecution({}, {})",
964                    self.seq, self.workflow_type
965                )
966            }
967        }
968
969        impl Display for RequestCancelExternalWorkflowExecution {
970            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
971                write!(f, "RequestCancelExternalWorkflowExecution({})", self.seq)
972            }
973        }
974
975        impl Display for UpsertWorkflowSearchAttributes {
976            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
977                let keys: Vec<_> = self
978                    .search_attributes
979                    .as_ref()
980                    .map(|sa| sa.indexed_fields.keys().collect())
981                    .unwrap_or_default();
982                write!(f, "UpsertWorkflowSearchAttributes({:?})", keys)
983            }
984        }
985
986        impl Display for SignalExternalWorkflowExecution {
987            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
988                write!(f, "SignalExternalWorkflowExecution({})", self.seq)
989            }
990        }
991
992        impl Display for CancelSignalWorkflow {
993            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
994                write!(f, "CancelSignalWorkflow({})", self.seq)
995            }
996        }
997
998        impl Display for CancelChildWorkflowExecution {
999            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1000                write!(
1001                    f,
1002                    "CancelChildWorkflowExecution({})",
1003                    self.child_workflow_seq
1004                )
1005            }
1006        }
1007
1008        impl Display for ModifyWorkflowProperties {
1009            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1010                write!(
1011                    f,
1012                    "ModifyWorkflowProperties(upserted memo keys: {:?})",
1013                    self.upserted_memo.as_ref().map(|m| m.fields.keys())
1014                )
1015            }
1016        }
1017
1018        impl Display for UpdateResponse {
1019            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1020                write!(
1021                    f,
1022                    "UpdateResponse(protocol_instance_id: {}, response: {:?})",
1023                    self.protocol_instance_id, self.response
1024                )
1025            }
1026        }
1027
1028        impl Display for ScheduleNexusOperation {
1029            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1030                write!(f, "ScheduleNexusOperation({})", self.seq)
1031            }
1032        }
1033
1034        impl Display for RequestCancelNexusOperation {
1035            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1036                write!(f, "RequestCancelNexusOperation({})", self.seq)
1037            }
1038        }
1039
1040        impl QueryResult {
1041            /// Helper to construct the Temporal API query result types.
1042            pub fn into_components(self) -> (String, QueryResultType, Option<Payloads>, String) {
1043                match self {
1044                    QueryResult {
1045                        variant: Some(query_result::Variant::Succeeded(qs)),
1046                        query_id,
1047                    } => (
1048                        query_id,
1049                        QueryResultType::Answered,
1050                        qs.response.map(Into::into),
1051                        "".to_string(),
1052                    ),
1053                    QueryResult {
1054                        variant: Some(query_result::Variant::Failed(err)),
1055                        query_id,
1056                    } => (query_id, QueryResultType::Failed, None, err.message),
1057                    QueryResult {
1058                        variant: None,
1059                        query_id,
1060                    } => (
1061                        query_id,
1062                        QueryResultType::Failed,
1063                        None,
1064                        "Query response was empty".to_string(),
1065                    ),
1066                }
1067            }
1068        }
1069    }
1070
1071    pub type HistoryEventId = i64;
1072
1073    impl From<workflow_activation_job::Variant> for WorkflowActivationJob {
1074        fn from(a: workflow_activation_job::Variant) -> Self {
1075            Self { variant: Some(a) }
1076        }
1077    }
1078
1079    impl From<Vec<WorkflowCommand>> for workflow_completion::Success {
1080        fn from(v: Vec<WorkflowCommand>) -> Self {
1081            Self {
1082                commands: v,
1083                used_internal_flags: vec![],
1084                versioning_behavior: VersioningBehavior::Unspecified.into(),
1085            }
1086        }
1087    }
1088
1089    impl From<workflow_command::Variant> for WorkflowCommand {
1090        fn from(v: workflow_command::Variant) -> Self {
1091            Self {
1092                variant: Some(v),
1093                user_metadata: None,
1094            }
1095        }
1096    }
1097
1098    impl workflow_completion::Success {
1099        pub fn from_variants(cmds: Vec<Variant>) -> Self {
1100            let cmds: Vec<_> = cmds.into_iter().map(|c| c.into()).collect();
1101            cmds.into()
1102        }
1103    }
1104
1105    impl WorkflowActivationCompletion {
1106        /// Create a successful activation with no commands in it
1107        pub fn empty(run_id: impl Into<String>) -> Self {
1108            let success = workflow_completion::Success::from_variants(vec![]);
1109            Self {
1110                run_id: run_id.into(),
1111                status: Some(workflow_activation_completion::Status::Successful(success)),
1112            }
1113        }
1114
1115        /// Create a successful activation from a list of command variants
1116        pub fn from_cmds(run_id: impl Into<String>, cmds: Vec<workflow_command::Variant>) -> Self {
1117            let success = workflow_completion::Success::from_variants(cmds);
1118            Self {
1119                run_id: run_id.into(),
1120                status: Some(workflow_activation_completion::Status::Successful(success)),
1121            }
1122        }
1123
1124        /// Create a successful activation from just one command variant
1125        pub fn from_cmd(run_id: impl Into<String>, cmd: workflow_command::Variant) -> Self {
1126            let success = workflow_completion::Success::from_variants(vec![cmd]);
1127            Self {
1128                run_id: run_id.into(),
1129                status: Some(workflow_activation_completion::Status::Successful(success)),
1130            }
1131        }
1132
1133        pub fn fail(
1134            run_id: impl Into<String>,
1135            failure: Failure,
1136            cause: Option<WorkflowTaskFailedCause>,
1137        ) -> Self {
1138            Self {
1139                run_id: run_id.into(),
1140                status: Some(workflow_activation_completion::Status::Failed(
1141                    workflow_completion::Failure {
1142                        failure: Some(failure),
1143                        force_cause: cause.unwrap_or(WorkflowTaskFailedCause::Unspecified) as i32,
1144                    },
1145                )),
1146            }
1147        }
1148
1149        /// Returns true if the activation has either a fail, continue, cancel, or complete workflow
1150        /// execution command in it.
1151        pub fn has_execution_ending(&self) -> bool {
1152            self.has_complete_workflow_execution()
1153                || self.has_fail_execution()
1154                || self.has_continue_as_new()
1155                || self.has_cancel_workflow_execution()
1156        }
1157
1158        /// Returns true if the activation contains a fail workflow execution command
1159        pub fn has_fail_execution(&self) -> bool {
1160            if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1161                return s.commands.iter().any(|wfc| {
1162                    matches!(
1163                        wfc,
1164                        WorkflowCommand {
1165                            variant: Some(workflow_command::Variant::FailWorkflowExecution(_)),
1166                            ..
1167                        }
1168                    )
1169                });
1170            }
1171            false
1172        }
1173
1174        /// Returns true if the activation contains a cancel workflow execution command
1175        pub fn has_cancel_workflow_execution(&self) -> bool {
1176            if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1177                return s.commands.iter().any(|wfc| {
1178                    matches!(
1179                        wfc,
1180                        WorkflowCommand {
1181                            variant: Some(workflow_command::Variant::CancelWorkflowExecution(_)),
1182                            ..
1183                        }
1184                    )
1185                });
1186            }
1187            false
1188        }
1189
1190        /// Returns true if the activation contains a continue as new workflow execution command
1191        pub fn has_continue_as_new(&self) -> bool {
1192            if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1193                return s.commands.iter().any(|wfc| {
1194                    matches!(
1195                        wfc,
1196                        WorkflowCommand {
1197                            variant: Some(
1198                                workflow_command::Variant::ContinueAsNewWorkflowExecution(_)
1199                            ),
1200                            ..
1201                        }
1202                    )
1203                });
1204            }
1205            false
1206        }
1207
1208        /// Returns true if the activation contains a complete workflow execution command
1209        pub fn has_complete_workflow_execution(&self) -> bool {
1210            self.complete_workflow_execution_value().is_some()
1211        }
1212
1213        /// Returns the completed execution result value, if any
1214        pub fn complete_workflow_execution_value(&self) -> Option<&Payload> {
1215            if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1216                s.commands.iter().find_map(|wfc| match wfc {
1217                    WorkflowCommand {
1218                        variant: Some(workflow_command::Variant::CompleteWorkflowExecution(v)),
1219                        ..
1220                    } => v.result.as_ref(),
1221                    _ => None,
1222                })
1223            } else {
1224                None
1225            }
1226        }
1227
1228        /// Returns true if the activation completion is a success with no commands
1229        pub fn is_empty(&self) -> bool {
1230            if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1231                return s.commands.is_empty();
1232            }
1233            false
1234        }
1235
1236        pub fn add_internal_flags(&mut self, patch: u32) {
1237            if let Some(workflow_activation_completion::Status::Successful(s)) = &mut self.status {
1238                s.used_internal_flags.push(patch);
1239            }
1240        }
1241    }
1242
1243    /// Makes converting outgoing lang commands into [WorkflowActivationCompletion]s easier
1244    pub trait IntoCompletion {
1245        /// The conversion function
1246        fn into_completion(self, run_id: String) -> WorkflowActivationCompletion;
1247    }
1248
1249    impl IntoCompletion for workflow_command::Variant {
1250        fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1251            WorkflowActivationCompletion::from_cmd(run_id, self)
1252        }
1253    }
1254
1255    impl<I, V> IntoCompletion for I
1256    where
1257        I: IntoIterator<Item = V>,
1258        V: Into<WorkflowCommand>,
1259    {
1260        fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1261            let success = self.into_iter().map(Into::into).collect::<Vec<_>>().into();
1262            WorkflowActivationCompletion {
1263                run_id,
1264                status: Some(workflow_activation_completion::Status::Successful(success)),
1265            }
1266        }
1267    }
1268
1269    impl Display for WorkflowActivationCompletion {
1270        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1271            write!(
1272                f,
1273                "WorkflowActivationCompletion(run_id: {}, status: ",
1274                &self.run_id
1275            )?;
1276            match &self.status {
1277                None => write!(f, "empty")?,
1278                Some(s) => write!(f, "{s}")?,
1279            };
1280            write!(f, ")")
1281        }
1282    }
1283
1284    impl Display for workflow_activation_completion::Status {
1285        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1286            match self {
1287                workflow_activation_completion::Status::Successful(
1288                    workflow_completion::Success { commands, .. },
1289                ) => {
1290                    write!(f, "Success(")?;
1291                    let mut written = 0;
1292                    for c in commands {
1293                        write!(f, "{c} ")?;
1294                        written += 1;
1295                        if written >= 10 && written < commands.len() {
1296                            write!(f, "... {} more", commands.len() - written)?;
1297                            break;
1298                        }
1299                    }
1300                    write!(f, ")")
1301                }
1302                workflow_activation_completion::Status::Failed(_) => {
1303                    write!(f, "Failed")
1304                }
1305            }
1306        }
1307    }
1308
1309    impl ActivityTask {
1310        pub fn start_from_poll_resp(r: PollActivityTaskQueueResponse) -> Self {
1311            let (workflow_id, run_id) = r
1312                .workflow_execution
1313                .map(|we| (we.workflow_id, we.run_id))
1314                .unwrap_or_default();
1315            Self {
1316                task_token: r.task_token,
1317                variant: Some(activity_task::activity_task::Variant::Start(
1318                    activity_task::Start {
1319                        workflow_namespace: r.workflow_namespace,
1320                        workflow_type: r.workflow_type.map_or_else(|| "".to_string(), |wt| wt.name),
1321                        workflow_execution: Some(WorkflowExecution {
1322                            workflow_id,
1323                            run_id,
1324                        }),
1325                        activity_id: r.activity_id,
1326                        activity_type: r.activity_type.map_or_else(|| "".to_string(), |at| at.name),
1327                        header_fields: r.header.map(Into::into).unwrap_or_default(),
1328                        input: Vec::from_payloads(r.input),
1329                        heartbeat_details: Vec::from_payloads(r.heartbeat_details),
1330                        scheduled_time: r.scheduled_time,
1331                        current_attempt_scheduled_time: r.current_attempt_scheduled_time,
1332                        started_time: r.started_time,
1333                        attempt: r.attempt as u32,
1334                        schedule_to_close_timeout: r.schedule_to_close_timeout,
1335                        start_to_close_timeout: r.start_to_close_timeout,
1336                        heartbeat_timeout: r.heartbeat_timeout,
1337                        retry_policy: r.retry_policy.map(fix_retry_policy),
1338                        priority: r.priority,
1339                        is_local: false,
1340                    },
1341                )),
1342            }
1343        }
1344    }
1345
1346    impl Failure {
1347        pub fn is_timeout(&self) -> Option<crate::protos::temporal::api::enums::v1::TimeoutType> {
1348            match &self.failure_info {
1349                Some(FailureInfo::TimeoutFailureInfo(ti)) => Some(ti.timeout_type()),
1350                _ => {
1351                    if let Some(c) = &self.cause {
1352                        c.is_timeout()
1353                    } else {
1354                        None
1355                    }
1356                }
1357            }
1358        }
1359
1360        pub fn application_failure(message: String, non_retryable: bool) -> Self {
1361            Self {
1362                message,
1363                failure_info: Some(FailureInfo::ApplicationFailureInfo(
1364                    ApplicationFailureInfo {
1365                        non_retryable,
1366                        ..Default::default()
1367                    },
1368                )),
1369                ..Default::default()
1370            }
1371        }
1372
1373        pub fn application_failure_from_error(ae: anyhow::Error, non_retryable: bool) -> Self {
1374            Self {
1375                failure_info: Some(FailureInfo::ApplicationFailureInfo(
1376                    ApplicationFailureInfo {
1377                        non_retryable,
1378                        ..Default::default()
1379                    },
1380                )),
1381                ..ae.chain()
1382                    .rfold(None, |cause, e| {
1383                        Some(Self {
1384                            message: e.to_string(),
1385                            cause: cause.map(Box::new),
1386                            ..Default::default()
1387                        })
1388                    })
1389                    .unwrap_or_default()
1390            }
1391        }
1392
1393        pub fn timeout(timeout_type: TimeoutType) -> Self {
1394            Self {
1395                message: "Activity timed out".to_string(),
1396                cause: Some(Box::new(Failure {
1397                    message: "Activity timed out".to_string(),
1398                    failure_info: Some(FailureInfo::TimeoutFailureInfo(TimeoutFailureInfo {
1399                        timeout_type: timeout_type.into(),
1400                        ..Default::default()
1401                    })),
1402                    ..Default::default()
1403                })),
1404                failure_info: Some(FailureInfo::ActivityFailureInfo(
1405                    ActivityFailureInfo::default(),
1406                )),
1407                ..Default::default()
1408            }
1409        }
1410
1411        /// Extracts an ApplicationFailureInfo from a Failure instance if it exists
1412        pub fn maybe_application_failure(&self) -> Option<&ApplicationFailureInfo> {
1413            if let Failure {
1414                failure_info: Some(FailureInfo::ApplicationFailureInfo(f)),
1415                ..
1416            } = self
1417            {
1418                Some(f)
1419            } else {
1420                None
1421            }
1422        }
1423
1424        // Checks if a failure is an ApplicationFailure with Benign category.
1425        pub fn is_benign_application_failure(&self) -> bool {
1426            self.maybe_application_failure()
1427                .is_some_and(|app_info| app_info.category() == ApplicationErrorCategory::Benign)
1428        }
1429    }
1430
1431    impl Display for Failure {
1432        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1433            write!(f, "Failure({}, ", self.message)?;
1434            match self.failure_info.as_ref() {
1435                None => write!(f, "missing info")?,
1436                Some(FailureInfo::TimeoutFailureInfo(v)) => {
1437                    write!(f, "Timeout: {:?}", v.timeout_type())?;
1438                }
1439                Some(FailureInfo::ApplicationFailureInfo(v)) => {
1440                    write!(f, "Application Failure: {}", v.r#type)?;
1441                }
1442                Some(FailureInfo::CanceledFailureInfo(_)) => {
1443                    write!(f, "Cancelled")?;
1444                }
1445                Some(FailureInfo::TerminatedFailureInfo(_)) => {
1446                    write!(f, "Terminated")?;
1447                }
1448                Some(FailureInfo::ServerFailureInfo(_)) => {
1449                    write!(f, "Server Failure")?;
1450                }
1451                Some(FailureInfo::ResetWorkflowFailureInfo(_)) => {
1452                    write!(f, "Reset Workflow")?;
1453                }
1454                Some(FailureInfo::ActivityFailureInfo(v)) => {
1455                    write!(
1456                        f,
1457                        "Activity Failure: scheduled_event_id: {}",
1458                        v.scheduled_event_id
1459                    )?;
1460                }
1461                Some(FailureInfo::ChildWorkflowExecutionFailureInfo(v)) => {
1462                    write!(
1463                        f,
1464                        "Child Workflow: started_event_id: {}",
1465                        v.started_event_id
1466                    )?;
1467                }
1468                Some(FailureInfo::NexusOperationExecutionFailureInfo(v)) => {
1469                    write!(
1470                        f,
1471                        "Nexus Operation Failure: scheduled_event_id: {}",
1472                        v.scheduled_event_id
1473                    )?;
1474                }
1475                Some(FailureInfo::NexusHandlerFailureInfo(v)) => {
1476                    write!(f, "Nexus Handler Failure: {}", v.r#type)?;
1477                }
1478            }
1479            write!(f, ")")
1480        }
1481    }
1482
1483    impl From<&str> for Failure {
1484        fn from(v: &str) -> Self {
1485            Failure::application_failure(v.to_string(), false)
1486        }
1487    }
1488
1489    impl From<String> for Failure {
1490        fn from(v: String) -> Self {
1491            Failure::application_failure(v, false)
1492        }
1493    }
1494
1495    impl From<anyhow::Error> for Failure {
1496        fn from(ae: anyhow::Error) -> Self {
1497            Failure::application_failure_from_error(ae, false)
1498        }
1499    }
1500
1501    pub trait FromPayloadsExt {
1502        fn from_payloads(p: Option<Payloads>) -> Self;
1503    }
1504    impl<T> FromPayloadsExt for T
1505    where
1506        T: FromIterator<Payload>,
1507    {
1508        fn from_payloads(p: Option<Payloads>) -> Self {
1509            match p {
1510                None => std::iter::empty().collect(),
1511                Some(p) => p.payloads.into_iter().collect(),
1512            }
1513        }
1514    }
1515
1516    pub trait IntoPayloadsExt {
1517        fn into_payloads(self) -> Option<Payloads>;
1518    }
1519    impl<T> IntoPayloadsExt for T
1520    where
1521        T: IntoIterator<Item = Payload>,
1522    {
1523        fn into_payloads(self) -> Option<Payloads> {
1524            let mut iterd = self.into_iter().peekable();
1525            if iterd.peek().is_none() {
1526                None
1527            } else {
1528                Some(Payloads {
1529                    payloads: iterd.collect(),
1530                })
1531            }
1532        }
1533    }
1534
1535    impl From<Payload> for Payloads {
1536        fn from(p: Payload) -> Self {
1537            Self { payloads: vec![p] }
1538        }
1539    }
1540
1541    impl<T> From<T> for Payloads
1542    where
1543        T: AsRef<[u8]>,
1544    {
1545        fn from(v: T) -> Self {
1546            Self {
1547                payloads: vec![v.into()],
1548            }
1549        }
1550    }
1551
1552    #[derive(thiserror::Error, Debug)]
1553    pub enum PayloadDeserializeErr {
1554        /// This deserializer does not handle this type of payload. Allows composing multiple
1555        /// deserializers.
1556        #[error("This deserializer does not understand this payload")]
1557        DeserializerDoesNotHandle,
1558        #[error("Error during deserialization: {0}")]
1559        DeserializeErr(#[from] anyhow::Error),
1560    }
1561
1562    // TODO: Once the prototype SDK is un-prototyped this serialization will need to be compat with
1563    //   other SDKs (given they might execute an activity).
1564    pub trait AsJsonPayloadExt {
1565        fn as_json_payload(&self) -> anyhow::Result<Payload>;
1566    }
1567    impl<T> AsJsonPayloadExt for T
1568    where
1569        T: Serialize,
1570    {
1571        fn as_json_payload(&self) -> anyhow::Result<Payload> {
1572            let as_json = serde_json::to_string(self)?;
1573            let mut metadata = HashMap::new();
1574            metadata.insert(
1575                ENCODING_PAYLOAD_KEY.to_string(),
1576                JSON_ENCODING_VAL.as_bytes().to_vec(),
1577            );
1578            Ok(Payload {
1579                metadata,
1580                data: as_json.into_bytes(),
1581                external_payloads: Default::default(),
1582            })
1583        }
1584    }
1585
1586    pub trait FromJsonPayloadExt: Sized {
1587        fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr>;
1588    }
1589    impl<T> FromJsonPayloadExt for T
1590    where
1591        T: for<'de> Deserialize<'de>,
1592    {
1593        fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr> {
1594            if !payload.is_json_payload() {
1595                return Err(PayloadDeserializeErr::DeserializerDoesNotHandle);
1596            }
1597            let payload_str = std::str::from_utf8(&payload.data).map_err(anyhow::Error::from)?;
1598            Ok(serde_json::from_str(payload_str).map_err(anyhow::Error::from)?)
1599        }
1600    }
1601
1602    /// Errors when converting from a [Payloads] api proto to our internal [Payload]
1603    #[derive(derive_more::Display, Debug)]
1604    pub enum PayloadsToPayloadError {
1605        MoreThanOnePayload,
1606        NoPayload,
1607    }
1608    impl TryFrom<Payloads> for Payload {
1609        type Error = PayloadsToPayloadError;
1610
1611        fn try_from(mut v: Payloads) -> Result<Self, Self::Error> {
1612            match v.payloads.pop() {
1613                None => Err(PayloadsToPayloadError::NoPayload),
1614                Some(p) => {
1615                    if v.payloads.is_empty() {
1616                        Ok(p)
1617                    } else {
1618                        Err(PayloadsToPayloadError::MoreThanOnePayload)
1619                    }
1620                }
1621            }
1622        }
1623    }
1624
1625    /// If initial_interval is missing, fills it with zero value to prevent crashes
1626    /// (lang assumes that RetryPolicy always has initial_interval set).
1627    fn fix_retry_policy(mut retry_policy: RetryPolicy) -> RetryPolicy {
1628        if retry_policy.initial_interval.is_none() {
1629            retry_policy.initial_interval = Default::default();
1630        }
1631        retry_policy
1632    }
1633}
1634
1635// No need to lint these
1636#[allow(
1637    clippy::all,
1638    missing_docs,
1639    rustdoc::broken_intra_doc_links,
1640    rustdoc::bare_urls
1641)]
1642// This is disgusting, but unclear to me how to avoid it. TODO: Discuss w/ prost maintainer
1643pub mod temporal {
1644    pub mod api {
1645        pub mod activity {
1646            pub mod v1 {
1647                tonic::include_proto!("temporal.api.activity.v1");
1648            }
1649        }
1650        pub mod batch {
1651            pub mod v1 {
1652                tonic::include_proto!("temporal.api.batch.v1");
1653            }
1654        }
1655        pub mod command {
1656            pub mod v1 {
1657                tonic::include_proto!("temporal.api.command.v1");
1658
1659                use crate::protos::{
1660                    coresdk::{IntoPayloadsExt, workflow_commands},
1661                    temporal::api::{
1662                        common::v1::{ActivityType, WorkflowType},
1663                        enums::v1::CommandType,
1664                    },
1665                };
1666                use command::Attributes;
1667                use std::fmt::{Display, Formatter};
1668
1669                impl From<command::Attributes> for Command {
1670                    fn from(c: command::Attributes) -> Self {
1671                        match c {
1672                            a @ Attributes::StartTimerCommandAttributes(_) => Self {
1673                                command_type: CommandType::StartTimer as i32,
1674                                attributes: Some(a),
1675                                user_metadata: Default::default(),
1676                            },
1677                            a @ Attributes::CancelTimerCommandAttributes(_) => Self {
1678                                command_type: CommandType::CancelTimer as i32,
1679                                attributes: Some(a),
1680                                user_metadata: Default::default(),
1681                            },
1682                            a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => Self {
1683                                command_type: CommandType::CompleteWorkflowExecution as i32,
1684                                attributes: Some(a),
1685                                user_metadata: Default::default(),
1686                            },
1687                            a @ Attributes::FailWorkflowExecutionCommandAttributes(_) => Self {
1688                                command_type: CommandType::FailWorkflowExecution as i32,
1689                                attributes: Some(a),
1690                                user_metadata: Default::default(),
1691                            },
1692                            a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self {
1693                                command_type: CommandType::ScheduleActivityTask as i32,
1694                                attributes: Some(a),
1695                                user_metadata: Default::default(),
1696                            },
1697                            a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => Self {
1698                                command_type: CommandType::RequestCancelActivityTask as i32,
1699                                attributes: Some(a),
1700                                user_metadata: Default::default(),
1701                            },
1702                            a @ Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1703                                Self {
1704                                    command_type: CommandType::ContinueAsNewWorkflowExecution
1705                                        as i32,
1706                                    attributes: Some(a),
1707                                    user_metadata: Default::default(),
1708                                }
1709                            }
1710                            a @ Attributes::CancelWorkflowExecutionCommandAttributes(_) => Self {
1711                                command_type: CommandType::CancelWorkflowExecution as i32,
1712                                attributes: Some(a),
1713                                user_metadata: Default::default(),
1714                            },
1715                            a @ Attributes::RecordMarkerCommandAttributes(_) => Self {
1716                                command_type: CommandType::RecordMarker as i32,
1717                                attributes: Some(a),
1718                                user_metadata: Default::default(),
1719                            },
1720                            a @ Attributes::ProtocolMessageCommandAttributes(_) => Self {
1721                                command_type: CommandType::ProtocolMessage as i32,
1722                                attributes: Some(a),
1723                                user_metadata: Default::default(),
1724                            },
1725                            a @ Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1726                                Self {
1727                                    command_type: CommandType::RequestCancelNexusOperation as i32,
1728                                    attributes: Some(a),
1729                                    user_metadata: Default::default(),
1730                                }
1731                            }
1732                            _ => unimplemented!(),
1733                        }
1734                    }
1735                }
1736
1737                impl Display for Command {
1738                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1739                        let ct = CommandType::try_from(self.command_type)
1740                            .unwrap_or(CommandType::Unspecified);
1741                        write!(f, "{:?}", ct)
1742                    }
1743                }
1744
1745                pub trait CommandAttributesExt {
1746                    fn as_type(&self) -> CommandType;
1747                }
1748
1749                impl CommandAttributesExt for command::Attributes {
1750                    fn as_type(&self) -> CommandType {
1751                        match self {
1752                            Attributes::ScheduleActivityTaskCommandAttributes(_) => {
1753                                CommandType::ScheduleActivityTask
1754                            }
1755                            Attributes::StartTimerCommandAttributes(_) => CommandType::StartTimer,
1756                            Attributes::CompleteWorkflowExecutionCommandAttributes(_) => {
1757                                CommandType::CompleteWorkflowExecution
1758                            }
1759                            Attributes::FailWorkflowExecutionCommandAttributes(_) => {
1760                                CommandType::FailWorkflowExecution
1761                            }
1762                            Attributes::RequestCancelActivityTaskCommandAttributes(_) => {
1763                                CommandType::RequestCancelActivityTask
1764                            }
1765                            Attributes::CancelTimerCommandAttributes(_) => CommandType::CancelTimer,
1766                            Attributes::CancelWorkflowExecutionCommandAttributes(_) => {
1767                                CommandType::CancelWorkflowExecution
1768                            }
1769                            Attributes::RequestCancelExternalWorkflowExecutionCommandAttributes(
1770                                _,
1771                            ) => CommandType::RequestCancelExternalWorkflowExecution,
1772                            Attributes::RecordMarkerCommandAttributes(_) => {
1773                                CommandType::RecordMarker
1774                            }
1775                            Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1776                                CommandType::ContinueAsNewWorkflowExecution
1777                            }
1778                            Attributes::StartChildWorkflowExecutionCommandAttributes(_) => {
1779                                CommandType::StartChildWorkflowExecution
1780                            }
1781                            Attributes::SignalExternalWorkflowExecutionCommandAttributes(_) => {
1782                                CommandType::SignalExternalWorkflowExecution
1783                            }
1784                            Attributes::UpsertWorkflowSearchAttributesCommandAttributes(_) => {
1785                                CommandType::UpsertWorkflowSearchAttributes
1786                            }
1787                            Attributes::ProtocolMessageCommandAttributes(_) => {
1788                                CommandType::ProtocolMessage
1789                            }
1790                            Attributes::ModifyWorkflowPropertiesCommandAttributes(_) => {
1791                                CommandType::ModifyWorkflowProperties
1792                            }
1793                            Attributes::ScheduleNexusOperationCommandAttributes(_) => {
1794                                CommandType::ScheduleNexusOperation
1795                            }
1796                            Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1797                                CommandType::RequestCancelNexusOperation
1798                            }
1799                        }
1800                    }
1801                }
1802
1803                impl Display for command::Attributes {
1804                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1805                        write!(f, "{:?}", self.as_type())
1806                    }
1807                }
1808
1809                impl From<workflow_commands::StartTimer> for command::Attributes {
1810                    fn from(s: workflow_commands::StartTimer) -> Self {
1811                        Self::StartTimerCommandAttributes(StartTimerCommandAttributes {
1812                            timer_id: s.seq.to_string(),
1813                            start_to_fire_timeout: s.start_to_fire_timeout,
1814                        })
1815                    }
1816                }
1817
1818                impl From<workflow_commands::UpsertWorkflowSearchAttributes> for command::Attributes {
1819                    fn from(s: workflow_commands::UpsertWorkflowSearchAttributes) -> Self {
1820                        Self::UpsertWorkflowSearchAttributesCommandAttributes(
1821                            UpsertWorkflowSearchAttributesCommandAttributes {
1822                                search_attributes: s.search_attributes,
1823                            },
1824                        )
1825                    }
1826                }
1827
1828                impl From<workflow_commands::ModifyWorkflowProperties> for command::Attributes {
1829                    fn from(s: workflow_commands::ModifyWorkflowProperties) -> Self {
1830                        Self::ModifyWorkflowPropertiesCommandAttributes(
1831                            ModifyWorkflowPropertiesCommandAttributes {
1832                                upserted_memo: s.upserted_memo.map(Into::into),
1833                            },
1834                        )
1835                    }
1836                }
1837
1838                impl From<workflow_commands::CancelTimer> for command::Attributes {
1839                    fn from(s: workflow_commands::CancelTimer) -> Self {
1840                        Self::CancelTimerCommandAttributes(CancelTimerCommandAttributes {
1841                            timer_id: s.seq.to_string(),
1842                        })
1843                    }
1844                }
1845
1846                pub fn schedule_activity_cmd_to_api(
1847                    s: workflow_commands::ScheduleActivity,
1848                    use_workflow_build_id: bool,
1849                ) -> command::Attributes {
1850                    command::Attributes::ScheduleActivityTaskCommandAttributes(
1851                        ScheduleActivityTaskCommandAttributes {
1852                            activity_id: s.activity_id,
1853                            activity_type: Some(ActivityType {
1854                                name: s.activity_type,
1855                            }),
1856                            task_queue: Some(s.task_queue.into()),
1857                            header: Some(s.headers.into()),
1858                            input: s.arguments.into_payloads(),
1859                            schedule_to_close_timeout: s.schedule_to_close_timeout,
1860                            schedule_to_start_timeout: s.schedule_to_start_timeout,
1861                            start_to_close_timeout: s.start_to_close_timeout,
1862                            heartbeat_timeout: s.heartbeat_timeout,
1863                            retry_policy: s.retry_policy.map(Into::into),
1864                            request_eager_execution: !s.do_not_eagerly_execute,
1865                            use_workflow_build_id,
1866                            priority: s.priority,
1867                        },
1868                    )
1869                }
1870
1871                #[allow(deprecated)]
1872                pub fn start_child_workflow_cmd_to_api(
1873                    s: workflow_commands::StartChildWorkflowExecution,
1874                    inherit_build_id: bool,
1875                ) -> command::Attributes {
1876                    command::Attributes::StartChildWorkflowExecutionCommandAttributes(
1877                        StartChildWorkflowExecutionCommandAttributes {
1878                            workflow_id: s.workflow_id,
1879                            workflow_type: Some(WorkflowType {
1880                                name: s.workflow_type,
1881                            }),
1882                            control: "".into(),
1883                            namespace: s.namespace,
1884                            task_queue: Some(s.task_queue.into()),
1885                            header: Some(s.headers.into()),
1886                            memo: Some(s.memo.into()),
1887                            search_attributes: s.search_attributes,
1888                            input: s.input.into_payloads(),
1889                            workflow_id_reuse_policy: s.workflow_id_reuse_policy,
1890                            workflow_execution_timeout: s.workflow_execution_timeout,
1891                            workflow_run_timeout: s.workflow_run_timeout,
1892                            workflow_task_timeout: s.workflow_task_timeout,
1893                            retry_policy: s.retry_policy.map(Into::into),
1894                            cron_schedule: s.cron_schedule.clone(),
1895                            parent_close_policy: s.parent_close_policy,
1896                            inherit_build_id,
1897                            priority: s.priority,
1898                        },
1899                    )
1900                }
1901
1902                impl From<workflow_commands::CompleteWorkflowExecution> for command::Attributes {
1903                    fn from(c: workflow_commands::CompleteWorkflowExecution) -> Self {
1904                        Self::CompleteWorkflowExecutionCommandAttributes(
1905                            CompleteWorkflowExecutionCommandAttributes {
1906                                result: c.result.map(Into::into),
1907                            },
1908                        )
1909                    }
1910                }
1911
1912                impl From<workflow_commands::FailWorkflowExecution> for command::Attributes {
1913                    fn from(c: workflow_commands::FailWorkflowExecution) -> Self {
1914                        Self::FailWorkflowExecutionCommandAttributes(
1915                            FailWorkflowExecutionCommandAttributes {
1916                                failure: c.failure.map(Into::into),
1917                            },
1918                        )
1919                    }
1920                }
1921
1922                #[allow(deprecated)]
1923                pub fn continue_as_new_cmd_to_api(
1924                    c: workflow_commands::ContinueAsNewWorkflowExecution,
1925                    inherit_build_id: bool,
1926                ) -> command::Attributes {
1927                    command::Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(
1928                        ContinueAsNewWorkflowExecutionCommandAttributes {
1929                            workflow_type: Some(c.workflow_type.into()),
1930                            task_queue: Some(c.task_queue.into()),
1931                            input: c.arguments.into_payloads(),
1932                            workflow_run_timeout: c.workflow_run_timeout,
1933                            workflow_task_timeout: c.workflow_task_timeout,
1934                            memo: if c.memo.is_empty() {
1935                                None
1936                            } else {
1937                                Some(c.memo.into())
1938                            },
1939                            header: if c.headers.is_empty() {
1940                                None
1941                            } else {
1942                                Some(c.headers.into())
1943                            },
1944                            retry_policy: c.retry_policy,
1945                            search_attributes: c.search_attributes,
1946                            inherit_build_id,
1947                            initial_versioning_behavior: c.initial_versioning_behavior,
1948                            ..Default::default()
1949                        },
1950                    )
1951                }
1952
1953                impl From<workflow_commands::CancelWorkflowExecution> for command::Attributes {
1954                    fn from(_c: workflow_commands::CancelWorkflowExecution) -> Self {
1955                        Self::CancelWorkflowExecutionCommandAttributes(
1956                            CancelWorkflowExecutionCommandAttributes { details: None },
1957                        )
1958                    }
1959                }
1960
1961                impl From<workflow_commands::ScheduleNexusOperation> for command::Attributes {
1962                    fn from(c: workflow_commands::ScheduleNexusOperation) -> Self {
1963                        Self::ScheduleNexusOperationCommandAttributes(
1964                            ScheduleNexusOperationCommandAttributes {
1965                                endpoint: c.endpoint,
1966                                service: c.service,
1967                                operation: c.operation,
1968                                input: c.input,
1969                                schedule_to_close_timeout: c.schedule_to_close_timeout,
1970                                schedule_to_start_timeout: c.schedule_to_start_timeout,
1971                                start_to_close_timeout: c.start_to_close_timeout,
1972                                nexus_header: c.nexus_header,
1973                            },
1974                        )
1975                    }
1976                }
1977            }
1978        }
1979        #[allow(rustdoc::invalid_html_tags)]
1980        pub mod cloud {
1981            pub mod account {
1982                pub mod v1 {
1983                    tonic::include_proto!("temporal.api.cloud.account.v1");
1984                }
1985            }
1986            pub mod cloudservice {
1987                pub mod v1 {
1988                    tonic::include_proto!("temporal.api.cloud.cloudservice.v1");
1989                }
1990            }
1991            pub mod connectivityrule {
1992                pub mod v1 {
1993                    tonic::include_proto!("temporal.api.cloud.connectivityrule.v1");
1994                }
1995            }
1996            pub mod identity {
1997                pub mod v1 {
1998                    tonic::include_proto!("temporal.api.cloud.identity.v1");
1999                }
2000            }
2001            pub mod namespace {
2002                pub mod v1 {
2003                    tonic::include_proto!("temporal.api.cloud.namespace.v1");
2004                }
2005            }
2006            pub mod nexus {
2007                pub mod v1 {
2008                    tonic::include_proto!("temporal.api.cloud.nexus.v1");
2009                }
2010            }
2011            pub mod operation {
2012                pub mod v1 {
2013                    tonic::include_proto!("temporal.api.cloud.operation.v1");
2014                }
2015            }
2016            pub mod region {
2017                pub mod v1 {
2018                    tonic::include_proto!("temporal.api.cloud.region.v1");
2019                }
2020            }
2021            pub mod resource {
2022                pub mod v1 {
2023                    tonic::include_proto!("temporal.api.cloud.resource.v1");
2024                }
2025            }
2026            pub mod sink {
2027                pub mod v1 {
2028                    tonic::include_proto!("temporal.api.cloud.sink.v1");
2029                }
2030            }
2031            pub mod usage {
2032                pub mod v1 {
2033                    tonic::include_proto!("temporal.api.cloud.usage.v1");
2034                }
2035            }
2036        }
2037        pub mod common {
2038            pub mod v1 {
2039                use crate::protos::{ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL};
2040                use base64::{Engine, prelude::BASE64_STANDARD};
2041                use std::{
2042                    collections::HashMap,
2043                    fmt::{Display, Formatter},
2044                };
2045                include_proto_with_serde!("temporal.api.common.v1");
2046
2047                impl<T> From<T> for Payload
2048                where
2049                    T: AsRef<[u8]>,
2050                {
2051                    fn from(v: T) -> Self {
2052                        // TODO: Set better encodings, whole data converter deal. Setting anything
2053                        //  for now at least makes it show up in the web UI.
2054                        let mut metadata = HashMap::new();
2055                        metadata.insert(ENCODING_PAYLOAD_KEY.to_string(), b"binary/plain".to_vec());
2056                        Self {
2057                            metadata,
2058                            data: v.as_ref().to_vec(),
2059                            external_payloads: Default::default(),
2060                        }
2061                    }
2062                }
2063
2064                impl Payload {
2065                    // Is its own function b/c asref causes implementation conflicts
2066                    pub fn as_slice(&self) -> &[u8] {
2067                        self.data.as_slice()
2068                    }
2069
2070                    pub fn is_json_payload(&self) -> bool {
2071                        self.metadata
2072                            .get(ENCODING_PAYLOAD_KEY)
2073                            .map(|v| v.as_slice() == JSON_ENCODING_VAL.as_bytes())
2074                            .unwrap_or_default()
2075                    }
2076                }
2077
2078                impl std::fmt::Debug for Payload {
2079                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2080                        if std::env::var("TEMPORAL_PRINT_FULL_PAYLOADS").is_err()
2081                            && self.data.len() > 64
2082                        {
2083                            let mut windows = self.data.as_slice().windows(32);
2084                            write!(
2085                                f,
2086                                "[{}..{}]",
2087                                BASE64_STANDARD.encode(windows.next().unwrap_or_default()),
2088                                BASE64_STANDARD.encode(windows.next_back().unwrap_or_default())
2089                            )
2090                        } else {
2091                            write!(f, "[{}]", BASE64_STANDARD.encode(&self.data))
2092                        }
2093                    }
2094                }
2095
2096                impl Display for Payload {
2097                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2098                        write!(f, "{:?}", self)
2099                    }
2100                }
2101
2102                impl Display for Header {
2103                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2104                        write!(f, "Header(")?;
2105                        for kv in &self.fields {
2106                            write!(f, "{}: ", kv.0)?;
2107                            write!(f, "{}, ", kv.1)?;
2108                        }
2109                        write!(f, ")")
2110                    }
2111                }
2112
2113                impl From<Header> for HashMap<String, Payload> {
2114                    fn from(h: Header) -> Self {
2115                        h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2116                    }
2117                }
2118
2119                impl From<Memo> for HashMap<String, Payload> {
2120                    fn from(h: Memo) -> Self {
2121                        h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2122                    }
2123                }
2124
2125                impl From<SearchAttributes> for HashMap<String, Payload> {
2126                    fn from(h: SearchAttributes) -> Self {
2127                        h.indexed_fields
2128                            .into_iter()
2129                            .map(|(k, v)| (k, v.into()))
2130                            .collect()
2131                    }
2132                }
2133
2134                impl From<HashMap<String, Payload>> for SearchAttributes {
2135                    fn from(h: HashMap<String, Payload>) -> Self {
2136                        Self {
2137                            indexed_fields: h.into_iter().map(|(k, v)| (k, v.into())).collect(),
2138                        }
2139                    }
2140                }
2141
2142                impl From<String> for ActivityType {
2143                    fn from(name: String) -> Self {
2144                        Self { name }
2145                    }
2146                }
2147
2148                impl From<&str> for ActivityType {
2149                    fn from(name: &str) -> Self {
2150                        Self {
2151                            name: name.to_string(),
2152                        }
2153                    }
2154                }
2155
2156                impl From<ActivityType> for String {
2157                    fn from(at: ActivityType) -> Self {
2158                        at.name
2159                    }
2160                }
2161
2162                impl From<&str> for WorkflowType {
2163                    fn from(v: &str) -> Self {
2164                        Self {
2165                            name: v.to_string(),
2166                        }
2167                    }
2168                }
2169            }
2170        }
2171        pub mod deployment {
2172            pub mod v1 {
2173                tonic::include_proto!("temporal.api.deployment.v1");
2174            }
2175        }
2176        pub mod enums {
2177            pub mod v1 {
2178                include_proto_with_serde!("temporal.api.enums.v1");
2179            }
2180        }
2181        pub mod errordetails {
2182            pub mod v1 {
2183                tonic::include_proto!("temporal.api.errordetails.v1");
2184            }
2185        }
2186        pub mod failure {
2187            pub mod v1 {
2188                include_proto_with_serde!("temporal.api.failure.v1");
2189            }
2190        }
2191        pub mod filter {
2192            pub mod v1 {
2193                tonic::include_proto!("temporal.api.filter.v1");
2194            }
2195        }
2196        pub mod history {
2197            pub mod v1 {
2198                use crate::protos::temporal::api::{
2199                    enums::v1::EventType, history::v1::history_event::Attributes,
2200                };
2201                use anyhow::bail;
2202                use std::fmt::{Display, Formatter};
2203
2204                tonic::include_proto!("temporal.api.history.v1");
2205
2206                impl History {
2207                    pub fn extract_run_id_from_start(&self) -> Result<&str, anyhow::Error> {
2208                        extract_original_run_id_from_events(&self.events)
2209                    }
2210
2211                    /// Returns the event id of the final event in the history. Will return 0 if
2212                    /// there are no events.
2213                    pub fn last_event_id(&self) -> i64 {
2214                        self.events.last().map(|e| e.event_id).unwrap_or_default()
2215                    }
2216                }
2217
2218                pub fn extract_original_run_id_from_events(
2219                    events: &[HistoryEvent],
2220                ) -> Result<&str, anyhow::Error> {
2221                    if let Some(Attributes::WorkflowExecutionStartedEventAttributes(wes)) =
2222                        events.get(0).and_then(|x| x.attributes.as_ref())
2223                    {
2224                        Ok(&wes.original_execution_run_id)
2225                    } else {
2226                        bail!("First event is not WorkflowExecutionStarted?!?")
2227                    }
2228                }
2229
2230                impl HistoryEvent {
2231                    /// Returns true if this is an event created to mirror a command
2232                    pub fn is_command_event(&self) -> bool {
2233                        EventType::try_from(self.event_type).map_or(false, |et| match et {
2234                            EventType::ActivityTaskScheduled
2235                            | EventType::ActivityTaskCancelRequested
2236                            | EventType::MarkerRecorded
2237                            | EventType::RequestCancelExternalWorkflowExecutionInitiated
2238                            | EventType::SignalExternalWorkflowExecutionInitiated
2239                            | EventType::StartChildWorkflowExecutionInitiated
2240                            | EventType::TimerCanceled
2241                            | EventType::TimerStarted
2242                            | EventType::UpsertWorkflowSearchAttributes
2243                            | EventType::WorkflowPropertiesModified
2244                            | EventType::NexusOperationScheduled
2245                            | EventType::NexusOperationCancelRequested
2246                            | EventType::WorkflowExecutionCanceled
2247                            | EventType::WorkflowExecutionCompleted
2248                            | EventType::WorkflowExecutionContinuedAsNew
2249                            | EventType::WorkflowExecutionFailed
2250                            | EventType::WorkflowExecutionUpdateAccepted
2251                            | EventType::WorkflowExecutionUpdateRejected
2252                            | EventType::WorkflowExecutionUpdateCompleted => true,
2253                            _ => false,
2254                        })
2255                    }
2256
2257                    /// Returns the command's initiating event id, if present. This is the id of the
2258                    /// event which "started" the command. Usually, the "scheduled" event for the
2259                    /// command.
2260                    pub fn get_initial_command_event_id(&self) -> Option<i64> {
2261                        self.attributes.as_ref().and_then(|a| {
2262                            // Fun! Not really any way to make this better w/o incompatibly changing
2263                            // protos.
2264                            match a {
2265                                Attributes::ActivityTaskStartedEventAttributes(a) =>
2266                                    Some(a.scheduled_event_id),
2267                                Attributes::ActivityTaskCompletedEventAttributes(a) =>
2268                                    Some(a.scheduled_event_id),
2269                                Attributes::ActivityTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2270                                Attributes::ActivityTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2271                                Attributes::ActivityTaskCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2272                                Attributes::ActivityTaskCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2273                                Attributes::TimerFiredEventAttributes(a) => Some(a.started_event_id),
2274                                Attributes::TimerCanceledEventAttributes(a) => Some(a.started_event_id),
2275                                Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2276                                Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(a) => Some(a.initiated_event_id),
2277                                Attributes::StartChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2278                                Attributes::ChildWorkflowExecutionStartedEventAttributes(a) => Some(a.initiated_event_id),
2279                                Attributes::ChildWorkflowExecutionCompletedEventAttributes(a) => Some(a.initiated_event_id),
2280                                Attributes::ChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2281                                Attributes::ChildWorkflowExecutionCanceledEventAttributes(a) => Some(a.initiated_event_id),
2282                                Attributes::ChildWorkflowExecutionTimedOutEventAttributes(a) => Some(a.initiated_event_id),
2283                                Attributes::ChildWorkflowExecutionTerminatedEventAttributes(a) => Some(a.initiated_event_id),
2284                                Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2285                                Attributes::ExternalWorkflowExecutionSignaledEventAttributes(a) => Some(a.initiated_event_id),
2286                                Attributes::WorkflowTaskStartedEventAttributes(a) => Some(a.scheduled_event_id),
2287                                Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2288                                Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2289                                Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2290                                Attributes::NexusOperationStartedEventAttributes(a) => Some(a.scheduled_event_id),
2291                                Attributes::NexusOperationCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2292                                Attributes::NexusOperationFailedEventAttributes(a) => Some(a.scheduled_event_id),
2293                                Attributes::NexusOperationTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2294                                Attributes::NexusOperationCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2295                                Attributes::NexusOperationCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2296                                Attributes::NexusOperationCancelRequestCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2297                                Attributes::NexusOperationCancelRequestFailedEventAttributes(a) => Some(a.scheduled_event_id),
2298                                _ => None
2299                            }
2300                        })
2301                    }
2302
2303                    /// Return the event's associated protocol instance, if one exists.
2304                    pub fn get_protocol_instance_id(&self) -> Option<&str> {
2305                        self.attributes.as_ref().and_then(|a| match a {
2306                            Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(a) => {
2307                                Some(a.protocol_instance_id.as_str())
2308                            }
2309                            _ => None,
2310                        })
2311                    }
2312
2313                    /// Returns true if the event is one which would end a workflow
2314                    pub fn is_final_wf_execution_event(&self) -> bool {
2315                        match self.event_type() {
2316                            EventType::WorkflowExecutionCompleted => true,
2317                            EventType::WorkflowExecutionCanceled => true,
2318                            EventType::WorkflowExecutionFailed => true,
2319                            EventType::WorkflowExecutionTimedOut => true,
2320                            EventType::WorkflowExecutionContinuedAsNew => true,
2321                            EventType::WorkflowExecutionTerminated => true,
2322                            _ => false,
2323                        }
2324                    }
2325
2326                    pub fn is_wft_closed_event(&self) -> bool {
2327                        match self.event_type() {
2328                            EventType::WorkflowTaskCompleted => true,
2329                            EventType::WorkflowTaskFailed => true,
2330                            EventType::WorkflowTaskTimedOut => true,
2331                            _ => false,
2332                        }
2333                    }
2334
2335                    pub fn is_ignorable(&self) -> bool {
2336                        if !self.worker_may_ignore {
2337                            return false;
2338                        }
2339                        // Never add a catch-all case to this match statement. We need to explicitly
2340                        // mark any new event types as ignorable or not.
2341                        if let Some(a) = self.attributes.as_ref() {
2342                            match a {
2343                                Attributes::WorkflowExecutionStartedEventAttributes(_) => false,
2344                                Attributes::WorkflowExecutionCompletedEventAttributes(_) => false,
2345                                Attributes::WorkflowExecutionFailedEventAttributes(_) => false,
2346                                Attributes::WorkflowExecutionTimedOutEventAttributes(_) => false,
2347                                Attributes::WorkflowTaskScheduledEventAttributes(_) => false,
2348                                Attributes::WorkflowTaskStartedEventAttributes(_) => false,
2349                                Attributes::WorkflowTaskCompletedEventAttributes(_) => false,
2350                                Attributes::WorkflowTaskTimedOutEventAttributes(_) => false,
2351                                Attributes::WorkflowTaskFailedEventAttributes(_) => false,
2352                                Attributes::ActivityTaskScheduledEventAttributes(_) => false,
2353                                Attributes::ActivityTaskStartedEventAttributes(_) => false,
2354                                Attributes::ActivityTaskCompletedEventAttributes(_) => false,
2355                                Attributes::ActivityTaskFailedEventAttributes(_) => false,
2356                                Attributes::ActivityTaskTimedOutEventAttributes(_) => false,
2357                                Attributes::TimerStartedEventAttributes(_) => false,
2358                                Attributes::TimerFiredEventAttributes(_) => false,
2359                                Attributes::ActivityTaskCancelRequestedEventAttributes(_) => false,
2360                                Attributes::ActivityTaskCanceledEventAttributes(_) => false,
2361                                Attributes::TimerCanceledEventAttributes(_) => false,
2362                                Attributes::MarkerRecordedEventAttributes(_) => false,
2363                                Attributes::WorkflowExecutionSignaledEventAttributes(_) => false,
2364                                Attributes::WorkflowExecutionTerminatedEventAttributes(_) => false,
2365                                Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => false,
2366                                Attributes::WorkflowExecutionCanceledEventAttributes(_) => false,
2367                                Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2368                                Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => false,
2369                                Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => false,
2370                                Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => false,
2371                                Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => false,
2372                                Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => false,
2373                                Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => false,
2374                                Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => false,
2375                                Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => false,
2376                                Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => false,
2377                                Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => false,
2378                                Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => false,
2379                                Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2380                                Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => false,
2381                                Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => false,
2382                                Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => false,
2383                                Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => false,
2384                                Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => false,
2385                                Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => false,
2386                                Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => false,
2387                                Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => false,
2388                                Attributes::WorkflowPropertiesModifiedEventAttributes(_) => false,
2389                                Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => false,
2390                                Attributes::NexusOperationScheduledEventAttributes(_) => false,
2391                                Attributes::NexusOperationStartedEventAttributes(_) => false,
2392                                Attributes::NexusOperationCompletedEventAttributes(_) => false,
2393                                Attributes::NexusOperationFailedEventAttributes(_) => false,
2394                                Attributes::NexusOperationCanceledEventAttributes(_) => false,
2395                                Attributes::NexusOperationTimedOutEventAttributes(_) => false,
2396                                Attributes::NexusOperationCancelRequestedEventAttributes(_) => false,
2397                                // !! Ignorable !!
2398                                Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => true,
2399                                Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => false,
2400                                Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => false,
2401                                // !! Ignorable !!
2402                                Attributes::WorkflowExecutionPausedEventAttributes(_) => true,
2403                                // !! Ignorable !!
2404                                Attributes::WorkflowExecutionUnpausedEventAttributes(_) => true,
2405                            }
2406                        } else {
2407                            false
2408                        }
2409                    }
2410                }
2411
2412                impl Display for HistoryEvent {
2413                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2414                        write!(
2415                            f,
2416                            "HistoryEvent(id: {}, {:?})",
2417                            self.event_id,
2418                            EventType::try_from(self.event_type).unwrap_or_default()
2419                        )
2420                    }
2421                }
2422
2423                impl Attributes {
2424                    pub fn event_type(&self) -> EventType {
2425                        // I just absolutely _love_ this
2426                        match self {
2427                            Attributes::WorkflowExecutionStartedEventAttributes(_) => { EventType::WorkflowExecutionStarted }
2428                            Attributes::WorkflowExecutionCompletedEventAttributes(_) => { EventType::WorkflowExecutionCompleted }
2429                            Attributes::WorkflowExecutionFailedEventAttributes(_) => { EventType::WorkflowExecutionFailed }
2430                            Attributes::WorkflowExecutionTimedOutEventAttributes(_) => { EventType::WorkflowExecutionTimedOut }
2431                            Attributes::WorkflowTaskScheduledEventAttributes(_) => { EventType::WorkflowTaskScheduled }
2432                            Attributes::WorkflowTaskStartedEventAttributes(_) => { EventType::WorkflowTaskStarted }
2433                            Attributes::WorkflowTaskCompletedEventAttributes(_) => { EventType::WorkflowTaskCompleted }
2434                            Attributes::WorkflowTaskTimedOutEventAttributes(_) => { EventType::WorkflowTaskTimedOut }
2435                            Attributes::WorkflowTaskFailedEventAttributes(_) => { EventType::WorkflowTaskFailed }
2436                            Attributes::ActivityTaskScheduledEventAttributes(_) => { EventType::ActivityTaskScheduled }
2437                            Attributes::ActivityTaskStartedEventAttributes(_) => { EventType::ActivityTaskStarted }
2438                            Attributes::ActivityTaskCompletedEventAttributes(_) => { EventType::ActivityTaskCompleted }
2439                            Attributes::ActivityTaskFailedEventAttributes(_) => { EventType::ActivityTaskFailed }
2440                            Attributes::ActivityTaskTimedOutEventAttributes(_) => { EventType::ActivityTaskTimedOut }
2441                            Attributes::TimerStartedEventAttributes(_) => { EventType::TimerStarted }
2442                            Attributes::TimerFiredEventAttributes(_) => { EventType::TimerFired }
2443                            Attributes::ActivityTaskCancelRequestedEventAttributes(_) => { EventType::ActivityTaskCancelRequested }
2444                            Attributes::ActivityTaskCanceledEventAttributes(_) => { EventType::ActivityTaskCanceled }
2445                            Attributes::TimerCanceledEventAttributes(_) => { EventType::TimerCanceled }
2446                            Attributes::MarkerRecordedEventAttributes(_) => { EventType::MarkerRecorded }
2447                            Attributes::WorkflowExecutionSignaledEventAttributes(_) => { EventType::WorkflowExecutionSignaled }
2448                            Attributes::WorkflowExecutionTerminatedEventAttributes(_) => { EventType::WorkflowExecutionTerminated }
2449                            Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::WorkflowExecutionCancelRequested }
2450                            Attributes::WorkflowExecutionCanceledEventAttributes(_) => { EventType::WorkflowExecutionCanceled }
2451                            Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionInitiated }
2452                            Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionFailed }
2453                            Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::ExternalWorkflowExecutionCancelRequested }
2454                            Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => { EventType::WorkflowExecutionContinuedAsNew }
2455                            Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => { EventType::StartChildWorkflowExecutionInitiated }
2456                            Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => { EventType::StartChildWorkflowExecutionFailed }
2457                            Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => { EventType::ChildWorkflowExecutionStarted }
2458                            Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => { EventType::ChildWorkflowExecutionCompleted }
2459                            Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => { EventType::ChildWorkflowExecutionFailed }
2460                            Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => { EventType::ChildWorkflowExecutionCanceled }
2461                            Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => { EventType::ChildWorkflowExecutionTimedOut }
2462                            Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => { EventType::ChildWorkflowExecutionTerminated }
2463                            Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionInitiated }
2464                            Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionFailed }
2465                            Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => { EventType::ExternalWorkflowExecutionSignaled }
2466                            Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => { EventType::UpsertWorkflowSearchAttributes }
2467                            Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAdmitted }
2468                            Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => { EventType::WorkflowExecutionUpdateRejected }
2469                            Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAccepted }
2470                            Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => { EventType::WorkflowExecutionUpdateCompleted }
2471                            Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => { EventType::WorkflowPropertiesModifiedExternally }
2472                            Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => { EventType::ActivityPropertiesModifiedExternally }
2473                            Attributes::WorkflowPropertiesModifiedEventAttributes(_) => { EventType::WorkflowPropertiesModified }
2474                            Attributes::NexusOperationScheduledEventAttributes(_) => { EventType::NexusOperationScheduled }
2475                            Attributes::NexusOperationStartedEventAttributes(_) => { EventType::NexusOperationStarted }
2476                            Attributes::NexusOperationCompletedEventAttributes(_) => { EventType::NexusOperationCompleted }
2477                            Attributes::NexusOperationFailedEventAttributes(_) => { EventType::NexusOperationFailed }
2478                            Attributes::NexusOperationCanceledEventAttributes(_) => { EventType::NexusOperationCanceled }
2479                            Attributes::NexusOperationTimedOutEventAttributes(_) => { EventType::NexusOperationTimedOut }
2480                            Attributes::NexusOperationCancelRequestedEventAttributes(_) => { EventType::NexusOperationCancelRequested }
2481                            Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => { EventType::WorkflowExecutionOptionsUpdated }
2482                            Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => { EventType::NexusOperationCancelRequestCompleted }
2483                            Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => { EventType::NexusOperationCancelRequestFailed }
2484                            Attributes::WorkflowExecutionPausedEventAttributes(_) => { EventType::WorkflowExecutionPaused }
2485                            Attributes::WorkflowExecutionUnpausedEventAttributes(_) => { EventType::WorkflowExecutionUnpaused }
2486                        }
2487                    }
2488                }
2489            }
2490        }
2491        pub mod namespace {
2492            pub mod v1 {
2493                tonic::include_proto!("temporal.api.namespace.v1");
2494            }
2495        }
2496        pub mod operatorservice {
2497            pub mod v1 {
2498                tonic::include_proto!("temporal.api.operatorservice.v1");
2499            }
2500        }
2501        pub mod protocol {
2502            pub mod v1 {
2503                use std::fmt::{Display, Formatter};
2504                tonic::include_proto!("temporal.api.protocol.v1");
2505
2506                impl Display for Message {
2507                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2508                        write!(f, "ProtocolMessage({})", self.id)
2509                    }
2510                }
2511            }
2512        }
2513        pub mod query {
2514            pub mod v1 {
2515                tonic::include_proto!("temporal.api.query.v1");
2516            }
2517        }
2518        pub mod replication {
2519            pub mod v1 {
2520                tonic::include_proto!("temporal.api.replication.v1");
2521            }
2522        }
2523        pub mod rules {
2524            pub mod v1 {
2525                tonic::include_proto!("temporal.api.rules.v1");
2526            }
2527        }
2528        pub mod schedule {
2529            #[allow(rustdoc::invalid_html_tags)]
2530            pub mod v1 {
2531                tonic::include_proto!("temporal.api.schedule.v1");
2532            }
2533        }
2534        pub mod sdk {
2535            pub mod v1 {
2536                tonic::include_proto!("temporal.api.sdk.v1");
2537            }
2538        }
2539        pub mod taskqueue {
2540            pub mod v1 {
2541                use crate::protos::temporal::api::enums::v1::TaskQueueKind;
2542                tonic::include_proto!("temporal.api.taskqueue.v1");
2543
2544                impl From<String> for TaskQueue {
2545                    fn from(name: String) -> Self {
2546                        Self {
2547                            name,
2548                            kind: TaskQueueKind::Normal as i32,
2549                            normal_name: "".to_string(),
2550                        }
2551                    }
2552                }
2553            }
2554        }
2555        pub mod testservice {
2556            pub mod v1 {
2557                tonic::include_proto!("temporal.api.testservice.v1");
2558            }
2559        }
2560        pub mod update {
2561            pub mod v1 {
2562                use crate::protos::temporal::api::update::v1::outcome::Value;
2563                tonic::include_proto!("temporal.api.update.v1");
2564
2565                impl Outcome {
2566                    pub fn is_success(&self) -> bool {
2567                        match self.value {
2568                            Some(Value::Success(_)) => true,
2569                            _ => false,
2570                        }
2571                    }
2572                }
2573            }
2574        }
2575        pub mod version {
2576            pub mod v1 {
2577                tonic::include_proto!("temporal.api.version.v1");
2578            }
2579        }
2580        pub mod worker {
2581            pub mod v1 {
2582                tonic::include_proto!("temporal.api.worker.v1");
2583            }
2584        }
2585        pub mod workflow {
2586            pub mod v1 {
2587                tonic::include_proto!("temporal.api.workflow.v1");
2588            }
2589        }
2590        pub mod nexus {
2591            pub mod v1 {
2592                use crate::protos::{
2593                    camel_case_to_screaming_snake,
2594                    temporal::api::{
2595                        common::{
2596                            self,
2597                            v1::link::{WorkflowEvent, workflow_event},
2598                        },
2599                        enums::v1::EventType,
2600                        failure,
2601                    },
2602                };
2603                use anyhow::{anyhow, bail};
2604                use prost::Name;
2605                use std::{
2606                    collections::HashMap,
2607                    fmt::{Display, Formatter},
2608                };
2609                use tonic::transport::Uri;
2610
2611                tonic::include_proto!("temporal.api.nexus.v1");
2612
2613                impl Display for Response {
2614                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2615                        write!(f, "NexusResponse(",)?;
2616                        match &self.variant {
2617                            None => {}
2618                            Some(v) => {
2619                                write!(f, "{v}")?;
2620                            }
2621                        }
2622                        write!(f, ")")
2623                    }
2624                }
2625
2626                impl Display for response::Variant {
2627                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2628                        match self {
2629                            response::Variant::StartOperation(_) => {
2630                                write!(f, "StartOperation")
2631                            }
2632                            response::Variant::CancelOperation(_) => {
2633                                write!(f, "CancelOperation")
2634                            }
2635                        }
2636                    }
2637                }
2638
2639                impl Display for HandlerError {
2640                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2641                        write!(f, "HandlerError")
2642                    }
2643                }
2644
2645                pub enum NexusTaskFailure {
2646                    Legacy(HandlerError),
2647                    Temporal(failure::v1::Failure),
2648                }
2649
2650                static SCHEME_PREFIX: &str = "temporal://";
2651
2652                /// Attempt to parse a nexus lint into a workflow event link
2653                pub fn workflow_event_link_from_nexus(
2654                    l: &Link,
2655                ) -> Result<common::v1::Link, anyhow::Error> {
2656                    if !l.url.starts_with(SCHEME_PREFIX) {
2657                        bail!("Invalid scheme for nexus link: {:?}", l.url);
2658                    }
2659                    // We strip the scheme/authority portion because of
2660                    // https://github.com/hyperium/http/issues/696
2661                    let no_authority_url = l.url.strip_prefix(SCHEME_PREFIX).unwrap();
2662                    let uri = Uri::try_from(no_authority_url)?;
2663                    let parts = uri.into_parts();
2664                    let path = parts.path_and_query.ok_or_else(|| {
2665                        anyhow!("Failed to parse nexus link, invalid path: {:?}", l)
2666                    })?;
2667                    let path_parts = path.path().split('/').collect::<Vec<_>>();
2668                    if path_parts.get(1) != Some(&"namespaces") {
2669                        bail!("Invalid path for nexus link: {:?}", l);
2670                    }
2671                    let namespace = path_parts.get(2).ok_or_else(|| {
2672                        anyhow!("Failed to parse nexus link, no namespace: {:?}", l)
2673                    })?;
2674                    if path_parts.get(3) != Some(&"workflows") {
2675                        bail!("Invalid path for nexus link, no workflows segment: {:?}", l);
2676                    }
2677                    let workflow_id = path_parts.get(4).ok_or_else(|| {
2678                        anyhow!("Failed to parse nexus link, no workflow id: {:?}", l)
2679                    })?;
2680                    let run_id = path_parts
2681                        .get(5)
2682                        .ok_or_else(|| anyhow!("Failed to parse nexus link, no run id: {:?}", l))?;
2683                    if path_parts.get(6) != Some(&"history") {
2684                        bail!("Invalid path for nexus link, no history segment: {:?}", l);
2685                    }
2686                    let reference = if let Some(query) = path.query() {
2687                        let mut eventref = workflow_event::EventReference::default();
2688                        let query_parts = query.split('&').collect::<Vec<_>>();
2689                        for qp in query_parts {
2690                            let mut kv = qp.split('=');
2691                            let key = kv.next().ok_or_else(|| {
2692                                anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2693                            })?;
2694                            let val = kv.next().ok_or_else(|| {
2695                                anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2696                            })?;
2697                            match key {
2698                                "eventID" => {
2699                                    eventref.event_id = val.parse().map_err(|_| {
2700                                        anyhow!("Failed to parse nexus link event id: {:?}", l)
2701                                    })?;
2702                                }
2703                                "eventType" => {
2704                                    eventref.event_type = EventType::from_str_name(val)
2705                                        .unwrap_or_else(|| {
2706                                            EventType::from_str_name(
2707                                                &("EVENT_TYPE_".to_string()
2708                                                    + &camel_case_to_screaming_snake(val)),
2709                                            )
2710                                            .unwrap_or_default()
2711                                        })
2712                                        .into()
2713                                }
2714                                _ => continue,
2715                            }
2716                        }
2717                        Some(workflow_event::Reference::EventRef(eventref))
2718                    } else {
2719                        None
2720                    };
2721
2722                    Ok(common::v1::Link {
2723                        variant: Some(common::v1::link::Variant::WorkflowEvent(WorkflowEvent {
2724                            namespace: namespace.to_string(),
2725                            workflow_id: workflow_id.to_string(),
2726                            run_id: run_id.to_string(),
2727                            reference,
2728                        })),
2729                    })
2730                }
2731
2732                impl TryFrom<failure::v1::Failure> for Failure {
2733                    type Error = serde_json::Error;
2734
2735                    fn try_from(mut f: failure::v1::Failure) -> Result<Self, Self::Error> {
2736                        // 1. Remove message from failure
2737                        let message = std::mem::take(&mut f.message);
2738
2739                        // 2. Serialize Failure as JSON
2740                        let details = serde_json::to_vec(&f)?;
2741
2742                        // 3. Package Temporal Failure as Nexus Failure
2743                        Ok(Failure {
2744                            message,
2745                            stack_trace: f.stack_trace,
2746                            metadata: HashMap::from([(
2747                                "type".to_string(),
2748                                failure::v1::Failure::full_name().into(),
2749                            )]),
2750                            details,
2751                            cause: None,
2752                        })
2753                    }
2754                }
2755            }
2756        }
2757        pub mod workflowservice {
2758            pub mod v1 {
2759                use std::{
2760                    convert::TryInto,
2761                    fmt::{Display, Formatter},
2762                    time::{Duration, SystemTime},
2763                };
2764
2765                tonic::include_proto!("temporal.api.workflowservice.v1");
2766
2767                macro_rules! sched_to_start_impl {
2768                    ($sched_field:ident) => {
2769                        /// Return the duration of the task schedule time (current attempt) to its
2770                        /// start time if both are set and time went forward.
2771                        pub fn sched_to_start(&self) -> Option<Duration> {
2772                            if let Some((sch, st)) =
2773                                self.$sched_field.clone().zip(self.started_time.clone())
2774                            {
2775                                if let Some(value) = elapsed_between_prost_times(sch, st) {
2776                                    return value;
2777                                }
2778                            }
2779                            None
2780                        }
2781                    };
2782                }
2783
2784                fn elapsed_between_prost_times(
2785                    from: prost_types::Timestamp,
2786                    to: prost_types::Timestamp,
2787                ) -> Option<Option<Duration>> {
2788                    let from: Result<SystemTime, _> = from.try_into();
2789                    let to: Result<SystemTime, _> = to.try_into();
2790                    if let (Ok(from), Ok(to)) = (from, to) {
2791                        return Some(to.duration_since(from).ok());
2792                    }
2793                    None
2794                }
2795
2796                impl PollWorkflowTaskQueueResponse {
2797                    sched_to_start_impl!(scheduled_time);
2798                }
2799
2800                impl Display for PollWorkflowTaskQueueResponse {
2801                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2802                        let last_event = self
2803                            .history
2804                            .as_ref()
2805                            .and_then(|h| h.events.last().map(|he| he.event_id))
2806                            .unwrap_or(0);
2807                        write!(
2808                            f,
2809                            "PollWFTQResp(run_id: {}, attempt: {}, last_event: {})",
2810                            self.workflow_execution
2811                                .as_ref()
2812                                .map_or("", |we| we.run_id.as_str()),
2813                            self.attempt,
2814                            last_event
2815                        )
2816                    }
2817                }
2818
2819                /// Can be used while debugging to avoid filling up a whole screen with poll resps
2820                pub struct CompactHist<'a>(pub &'a PollWorkflowTaskQueueResponse);
2821                impl Display for CompactHist<'_> {
2822                    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2823                        writeln!(
2824                            f,
2825                            "PollWorkflowTaskQueueResponse (prev_started: {}, started: {})",
2826                            self.0.previous_started_event_id, self.0.started_event_id
2827                        )?;
2828                        if let Some(h) = self.0.history.as_ref() {
2829                            for event in &h.events {
2830                                writeln!(f, "{}", event)?;
2831                            }
2832                        }
2833                        writeln!(f, "query: {:#?}", self.0.query)?;
2834                        writeln!(f, "queries: {:#?}", self.0.queries)
2835                    }
2836                }
2837
2838                impl PollActivityTaskQueueResponse {
2839                    sched_to_start_impl!(current_attempt_scheduled_time);
2840                }
2841
2842                impl PollNexusTaskQueueResponse {
2843                    pub fn sched_to_start(&self) -> Option<Duration> {
2844                        if let Some((sch, st)) = self
2845                            .request
2846                            .as_ref()
2847                            .and_then(|r| r.scheduled_time)
2848                            .clone()
2849                            .zip(SystemTime::now().try_into().ok())
2850                        {
2851                            if let Some(value) = elapsed_between_prost_times(sch, st) {
2852                                return value;
2853                            }
2854                        }
2855                        None
2856                    }
2857                }
2858
2859                impl QueryWorkflowResponse {
2860                    /// Unwrap a successful response as vec of payloads
2861                    pub fn unwrap(self) -> Vec<crate::protos::temporal::api::common::v1::Payload> {
2862                        self.query_result.unwrap().payloads
2863                    }
2864                }
2865            }
2866        }
2867    }
2868}
2869
2870#[allow(
2871    clippy::all,
2872    missing_docs,
2873    rustdoc::broken_intra_doc_links,
2874    rustdoc::bare_urls
2875)]
2876pub mod google {
2877    pub mod rpc {
2878        tonic::include_proto!("google.rpc");
2879    }
2880}
2881
2882#[allow(
2883    clippy::all,
2884    missing_docs,
2885    rustdoc::broken_intra_doc_links,
2886    rustdoc::bare_urls
2887)]
2888pub mod grpc {
2889    pub mod health {
2890        pub mod v1 {
2891            tonic::include_proto!("grpc.health.v1");
2892        }
2893    }
2894}
2895
2896/// Case conversion, used for json -> proto enum string conversion
2897pub fn camel_case_to_screaming_snake(val: &str) -> String {
2898    let mut out = String::new();
2899    let mut last_was_upper = true;
2900    for c in val.chars() {
2901        if c.is_uppercase() {
2902            if !last_was_upper {
2903                out.push('_');
2904            }
2905            out.push(c.to_ascii_uppercase());
2906            last_was_upper = true;
2907        } else {
2908            out.push(c.to_ascii_uppercase());
2909            last_was_upper = false;
2910        }
2911    }
2912    out
2913}
2914
2915/// Convert a protobuf [`prost_types::Timestamp`] to a [`std::time::SystemTime`].
2916pub fn proto_ts_to_system_time(ts: &prost_types::Timestamp) -> Option<std::time::SystemTime> {
2917    std::time::SystemTime::UNIX_EPOCH
2918        .checked_add(Duration::from_secs(ts.seconds as u64) + Duration::from_nanos(ts.nanos as u64))
2919}
2920
2921#[cfg(test)]
2922mod tests {
2923    use crate::protos::temporal::api::failure::v1::Failure;
2924    use anyhow::anyhow;
2925
2926    #[test]
2927    fn anyhow_to_failure_conversion() {
2928        let no_causes: Failure = anyhow!("no causes").into();
2929        assert_eq!(no_causes.cause, None);
2930        assert_eq!(no_causes.message, "no causes");
2931        let orig = anyhow!("fail 1");
2932        let mid = orig.context("fail 2");
2933        let top = mid.context("fail 3");
2934        let as_fail: Failure = top.into();
2935        assert_eq!(as_fail.message, "fail 3");
2936        assert_eq!(as_fail.cause.as_ref().unwrap().message, "fail 2");
2937        assert_eq!(as_fail.cause.unwrap().cause.unwrap().message, "fail 1");
2938    }
2939}