1pub mod constants;
6pub mod utilities;
8
9#[cfg(feature = "test-utilities")]
10pub mod canned_histories;
12#[cfg(feature = "history_builders")]
13mod history_builder;
14#[cfg(feature = "history_builders")]
15mod history_info;
16mod task_token;
17#[cfg(feature = "test-utilities")]
18pub mod test_utils;
19
20use std::time::Duration;
21
22#[cfg(feature = "history_builders")]
23pub use history_builder::{
24 DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE, TestHistoryBuilder, default_act_sched,
25 default_wes_attribs,
26};
27#[cfg(feature = "history_builders")]
28pub use history_info::HistoryInfo;
29pub use task_token::TaskToken;
30
31pub static ENCODING_PAYLOAD_KEY: &str = "encoding";
33pub static JSON_ENCODING_VAL: &str = "json/plain";
35pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data";
37pub static VERSION_SEARCH_ATTR_KEY: &str = "TemporalChangeVersion";
39
40macro_rules! include_proto_with_serde {
41 ($pkg:tt) => {
42 tonic::include_proto!($pkg);
43
44 include!(concat!(env!("OUT_DIR"), concat!("/", $pkg, ".serde.rs")));
45 };
46}
47
48#[allow(
49 clippy::large_enum_variant,
50 clippy::derive_partial_eq_without_eq,
51 clippy::reserve_after_initialization
52)]
53#[allow(missing_docs)]
55pub mod coresdk {
56 tonic::include_proto!("coresdk");
59
60 use crate::protos::{
61 ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL,
62 temporal::api::{
63 common::v1::{Payload, Payloads, RetryPolicy, WorkflowExecution},
64 enums::v1::{
65 ApplicationErrorCategory, TimeoutType, VersioningBehavior, WorkflowTaskFailedCause,
66 },
67 failure::v1::{
68 ActivityFailureInfo, ApplicationFailureInfo, Failure, TimeoutFailureInfo,
69 failure::FailureInfo,
70 },
71 workflowservice::v1::PollActivityTaskQueueResponse,
72 },
73 };
74 use activity_task::ActivityTask;
75 use serde::{Deserialize, Serialize};
76 use std::{
77 collections::HashMap,
78 convert::TryFrom,
79 fmt::{Display, Formatter},
80 iter::FromIterator,
81 };
82 use workflow_activation::{WorkflowActivationJob, workflow_activation_job};
83 use workflow_commands::{WorkflowCommand, workflow_command, workflow_command::Variant};
84 use workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion};
85
86 #[allow(clippy::module_inception)]
87 pub mod activity_task {
88 use crate::protos::{coresdk::ActivityTaskCompletion, task_token::format_task_token};
89 use std::fmt::{Display, Formatter};
90 tonic::include_proto!("coresdk.activity_task");
91
92 impl ActivityTask {
93 pub fn cancel_from_ids(
94 task_token: Vec<u8>,
95 reason: ActivityCancelReason,
96 details: ActivityCancellationDetails,
97 ) -> Self {
98 Self {
99 task_token,
100 variant: Some(activity_task::Variant::Cancel(Cancel {
101 reason: reason as i32,
102 details: Some(details),
103 })),
104 }
105 }
106
107 pub fn is_timeout(&self) -> bool {
109 match &self.variant {
110 Some(activity_task::Variant::Cancel(Cancel { reason, details })) => {
111 *reason == ActivityCancelReason::TimedOut as i32
112 || details.as_ref().is_some_and(|d| d.is_timed_out)
113 }
114 _ => false,
115 }
116 }
117
118 pub fn primary_reason_to_cancellation_details(
119 reason: ActivityCancelReason,
120 ) -> ActivityCancellationDetails {
121 ActivityCancellationDetails {
122 is_not_found: reason == ActivityCancelReason::NotFound,
123 is_cancelled: reason == ActivityCancelReason::Cancelled,
124 is_paused: reason == ActivityCancelReason::Paused,
125 is_timed_out: reason == ActivityCancelReason::TimedOut,
126 is_worker_shutdown: reason == ActivityCancelReason::WorkerShutdown,
127 is_reset: reason == ActivityCancelReason::Reset,
128 }
129 }
130 }
131
132 impl Display for ActivityTaskCompletion {
133 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134 write!(
135 f,
136 "ActivityTaskCompletion(token: {}",
137 format_task_token(&self.task_token),
138 )?;
139 if let Some(r) = self.result.as_ref().and_then(|r| r.status.as_ref()) {
140 write!(f, ", {r}")?;
141 } else {
142 write!(f, ", missing result")?;
143 }
144 write!(f, ")")
145 }
146 }
147 }
148 #[allow(clippy::module_inception)]
149 pub mod activity_result {
150 tonic::include_proto!("coresdk.activity_result");
151 use super::super::temporal::api::{
152 common::v1::Payload,
153 failure::v1::{CanceledFailureInfo, Failure as APIFailure, failure},
154 };
155 use crate::protos::{
156 coresdk::activity_result::activity_resolution::Status,
157 temporal::api::enums::v1::TimeoutType,
158 };
159 use activity_execution_result as aer;
160 use anyhow::anyhow;
161 use std::fmt::{Display, Formatter};
162
163 impl ActivityExecutionResult {
164 pub const fn ok(result: Payload) -> Self {
165 Self {
166 status: Some(aer::Status::Completed(Success {
167 result: Some(result),
168 })),
169 }
170 }
171
172 pub fn fail(fail: APIFailure) -> Self {
173 Self {
174 status: Some(aer::Status::Failed(Failure {
175 failure: Some(fail),
176 })),
177 }
178 }
179
180 pub fn cancel_from_details(payload: Option<Payload>) -> Self {
181 Self {
182 status: Some(aer::Status::Cancelled(Cancellation::from_details(payload))),
183 }
184 }
185
186 pub const fn will_complete_async() -> Self {
187 Self {
188 status: Some(aer::Status::WillCompleteAsync(WillCompleteAsync {})),
189 }
190 }
191
192 pub fn is_cancelled(&self) -> bool {
193 matches!(self.status, Some(aer::Status::Cancelled(_)))
194 }
195 }
196
197 impl Display for aer::Status {
198 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199 write!(f, "ActivityExecutionResult(")?;
200 match self {
201 aer::Status::Completed(v) => {
202 write!(f, "{v})")
203 }
204 aer::Status::Failed(v) => {
205 write!(f, "{v})")
206 }
207 aer::Status::Cancelled(v) => {
208 write!(f, "{v})")
209 }
210 aer::Status::WillCompleteAsync(_) => {
211 write!(f, "Will complete async)")
212 }
213 }
214 }
215 }
216
217 impl Display for Success {
218 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
219 write!(f, "Success(")?;
220 if let Some(ref v) = self.result {
221 write!(f, "{v}")?;
222 }
223 write!(f, ")")
224 }
225 }
226
227 impl Display for Failure {
228 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
229 write!(f, "Failure(")?;
230 if let Some(ref v) = self.failure {
231 write!(f, "{v}")?;
232 }
233 write!(f, ")")
234 }
235 }
236
237 impl Display for Cancellation {
238 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
239 write!(f, "Cancellation(")?;
240 if let Some(ref v) = self.failure {
241 write!(f, "{v}")?;
242 }
243 write!(f, ")")
244 }
245 }
246
247 impl From<Result<Payload, APIFailure>> for ActivityExecutionResult {
248 fn from(r: Result<Payload, APIFailure>) -> Self {
249 Self {
250 status: match r {
251 Ok(p) => Some(aer::Status::Completed(Success { result: Some(p) })),
252 Err(f) => Some(aer::Status::Failed(Failure { failure: Some(f) })),
253 },
254 }
255 }
256 }
257
258 impl ActivityResolution {
259 pub fn success_payload_or_error(self) -> Result<Option<Payload>, anyhow::Error> {
262 let Some(status) = self.status else {
263 return Err(anyhow!("Activity completed without a status"));
264 };
265
266 match status {
267 activity_resolution::Status::Completed(success) => Ok(success.result),
268 e => Err(anyhow!("Activity was not successful: {e:?}")),
269 }
270 }
271
272 pub fn unwrap_ok_payload(self) -> Payload {
273 self.success_payload_or_error().unwrap().unwrap()
274 }
275
276 pub fn completed_ok(&self) -> bool {
277 matches!(self.status, Some(activity_resolution::Status::Completed(_)))
278 }
279
280 pub fn failed(&self) -> bool {
281 matches!(self.status, Some(activity_resolution::Status::Failed(_)))
282 }
283
284 pub fn timed_out(&self) -> Option<TimeoutType> {
285 match self.status {
286 Some(activity_resolution::Status::Failed(Failure {
287 failure: Some(ref f),
288 })) => f.is_timeout(),
289 _ => None,
290 }
291 }
292
293 pub fn cancelled(&self) -> bool {
294 matches!(self.status, Some(activity_resolution::Status::Cancelled(_)))
295 }
296
297 pub fn unwrap_failure(self) -> APIFailure {
300 match self.status.unwrap() {
301 Status::Failed(f) => f.failure.unwrap(),
302 Status::Cancelled(c) => c.failure.unwrap(),
303 _ => panic!("Actvity did not fail"),
304 }
305 }
306 }
307
308 impl Cancellation {
309 pub fn from_details(details: Option<Payload>) -> Self {
312 Cancellation {
313 failure: Some(APIFailure {
314 message: "Activity cancelled".to_string(),
315 failure_info: Some(failure::FailureInfo::CanceledFailureInfo(
316 CanceledFailureInfo {
317 details: details.map(Into::into),
318 },
319 )),
320 ..Default::default()
321 }),
322 }
323 }
324 }
325 }
326
327 pub mod common {
328 tonic::include_proto!("coresdk.common");
329 use super::external_data::LocalActivityMarkerData;
330 use crate::protos::{
331 PATCHED_MARKER_DETAILS_KEY,
332 coresdk::{
333 AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt,
334 external_data::PatchedMarkerData,
335 },
336 temporal::api::common::v1::{Payload, Payloads},
337 };
338 use std::collections::HashMap;
339
340 pub fn build_has_change_marker_details(
341 patch_id: impl Into<String>,
342 deprecated: bool,
343 ) -> anyhow::Result<HashMap<String, Payloads>> {
344 let mut hm = HashMap::new();
345 let encoded = PatchedMarkerData {
346 id: patch_id.into(),
347 deprecated,
348 }
349 .as_json_payload()?;
350 hm.insert(PATCHED_MARKER_DETAILS_KEY.to_string(), encoded.into());
351 Ok(hm)
352 }
353
354 pub fn decode_change_marker_details(
355 details: &HashMap<String, Payloads>,
356 ) -> Option<(String, bool)> {
357 if let Some(cd) = details.get(PATCHED_MARKER_DETAILS_KEY) {
360 let decoded = PatchedMarkerData::from_json_payload(cd.payloads.first()?).ok()?;
361 return Some((decoded.id, decoded.deprecated));
362 }
363
364 let id_entry = details.get("patch_id")?.payloads.first()?;
365 let deprecated_entry = details.get("deprecated")?.payloads.first()?;
366 let name = std::str::from_utf8(&id_entry.data).ok()?;
367 let deprecated = *deprecated_entry.data.first()? != 0;
368 Some((name.to_string(), deprecated))
369 }
370
371 pub fn build_local_activity_marker_details(
372 metadata: LocalActivityMarkerData,
373 result: Option<Payload>,
374 ) -> HashMap<String, Payloads> {
375 let mut hm = HashMap::new();
376 if let Some(jsonified) = metadata.as_json_payload().into_payloads() {
379 hm.insert("data".to_string(), jsonified);
380 }
381 if let Some(res) = result {
382 hm.insert("result".to_string(), res.into());
383 }
384 hm
385 }
386
387 pub fn extract_local_activity_marker_data(
390 details: &HashMap<String, Payloads>,
391 ) -> Option<LocalActivityMarkerData> {
392 details
393 .get("data")
394 .and_then(|p| p.payloads.first())
395 .and_then(|p| std::str::from_utf8(&p.data).ok())
396 .and_then(|s| serde_json::from_str(s).ok())
397 }
398
399 pub fn extract_local_activity_marker_details(
403 details: &mut HashMap<String, Payloads>,
404 ) -> (Option<LocalActivityMarkerData>, Option<Payload>) {
405 let data = extract_local_activity_marker_data(details);
406 let result = details.remove("result").and_then(|mut p| p.payloads.pop());
407 (data, result)
408 }
409 }
410
411 pub mod external_data {
412 use prost_types::{Duration, Timestamp};
413 use serde::{Deserialize, Deserializer, Serialize, Serializer};
414 tonic::include_proto!("coresdk.external_data");
415
416 #[derive(Serialize, Deserialize)]
420 #[serde(remote = "Timestamp")]
421 struct TimestampDef {
422 seconds: i64,
423 nanos: i32,
424 }
425 mod opt_timestamp {
426 use super::*;
427
428 pub(super) fn serialize<S>(
429 value: &Option<Timestamp>,
430 serializer: S,
431 ) -> Result<S::Ok, S::Error>
432 where
433 S: Serializer,
434 {
435 #[derive(Serialize)]
436 struct Helper<'a>(#[serde(with = "TimestampDef")] &'a Timestamp);
437
438 value.as_ref().map(Helper).serialize(serializer)
439 }
440
441 pub(super) fn deserialize<'de, D>(
442 deserializer: D,
443 ) -> Result<Option<Timestamp>, D::Error>
444 where
445 D: Deserializer<'de>,
446 {
447 #[derive(Deserialize)]
448 struct Helper(#[serde(with = "TimestampDef")] Timestamp);
449
450 let helper = Option::deserialize(deserializer)?;
451 Ok(helper.map(|Helper(external)| external))
452 }
453 }
454
455 #[derive(Serialize, Deserialize)]
457 #[serde(remote = "Duration")]
458 struct DurationDef {
459 seconds: i64,
460 nanos: i32,
461 }
462 mod opt_duration {
463 use super::*;
464
465 pub(super) fn serialize<S>(
466 value: &Option<Duration>,
467 serializer: S,
468 ) -> Result<S::Ok, S::Error>
469 where
470 S: Serializer,
471 {
472 #[derive(Serialize)]
473 struct Helper<'a>(#[serde(with = "DurationDef")] &'a Duration);
474
475 value.as_ref().map(Helper).serialize(serializer)
476 }
477
478 pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
479 where
480 D: Deserializer<'de>,
481 {
482 #[derive(Deserialize)]
483 struct Helper(#[serde(with = "DurationDef")] Duration);
484
485 let helper = Option::deserialize(deserializer)?;
486 Ok(helper.map(|Helper(external)| external))
487 }
488 }
489 }
490
491 pub mod workflow_activation {
492 use crate::protos::{
493 coresdk::{
494 FromPayloadsExt,
495 activity_result::{ActivityResolution, activity_resolution},
496 common::NamespacedWorkflowExecution,
497 fix_retry_policy,
498 workflow_activation::remove_from_cache::EvictionReason,
499 },
500 temporal::api::{
501 enums::v1::WorkflowTaskFailedCause,
502 history::v1::{
503 WorkflowExecutionCancelRequestedEventAttributes,
504 WorkflowExecutionSignaledEventAttributes,
505 WorkflowExecutionStartedEventAttributes,
506 },
507 query::v1::WorkflowQuery,
508 },
509 };
510 use prost_types::Timestamp;
511 use std::fmt::{Display, Formatter};
512
513 tonic::include_proto!("coresdk.workflow_activation");
514
515 pub fn create_evict_activation(
516 run_id: String,
517 message: String,
518 reason: EvictionReason,
519 ) -> WorkflowActivation {
520 WorkflowActivation {
521 timestamp: None,
522 run_id,
523 is_replaying: false,
524 history_length: 0,
525 jobs: vec![WorkflowActivationJob::from(
526 workflow_activation_job::Variant::RemoveFromCache(RemoveFromCache {
527 message,
528 reason: reason as i32,
529 }),
530 )],
531 available_internal_flags: vec![],
532 history_size_bytes: 0,
533 continue_as_new_suggested: false,
534 deployment_version_for_current_task: None,
535 last_sdk_version: String::new(),
536 suggest_continue_as_new_reasons: vec![],
537 target_worker_deployment_version_changed: false,
538 }
539 }
540
541 pub fn query_to_job(id: String, q: WorkflowQuery) -> QueryWorkflow {
542 QueryWorkflow {
543 query_id: id,
544 query_type: q.query_type,
545 arguments: Vec::from_payloads(q.query_args),
546 headers: q.header.map(|h| h.into()).unwrap_or_default(),
547 }
548 }
549
550 impl WorkflowActivation {
551 pub fn is_only_eviction(&self) -> bool {
553 matches!(
554 self.jobs.as_slice(),
555 [WorkflowActivationJob {
556 variant: Some(workflow_activation_job::Variant::RemoveFromCache(_))
557 }]
558 )
559 }
560
561 pub fn eviction_reason(&self) -> Option<EvictionReason> {
563 self.jobs.iter().find_map(|j| {
564 if let Some(workflow_activation_job::Variant::RemoveFromCache(ref rj)) =
565 j.variant
566 {
567 EvictionReason::try_from(rj.reason).ok()
568 } else {
569 None
570 }
571 })
572 }
573 }
574
575 impl workflow_activation_job::Variant {
576 pub fn is_local_activity_resolution(&self) -> bool {
577 matches!(self, workflow_activation_job::Variant::ResolveActivity(ra) if ra.is_local)
578 }
579 }
580
581 impl Display for EvictionReason {
582 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
583 write!(f, "{self:?}")
584 }
585 }
586
587 impl From<EvictionReason> for WorkflowTaskFailedCause {
588 fn from(value: EvictionReason) -> Self {
589 match value {
590 EvictionReason::Nondeterminism => {
591 WorkflowTaskFailedCause::NonDeterministicError
592 }
593 _ => WorkflowTaskFailedCause::Unspecified,
594 }
595 }
596 }
597
598 impl Display for WorkflowActivation {
599 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
600 write!(f, "WorkflowActivation(")?;
601 write!(f, "run_id: {}, ", self.run_id)?;
602 write!(f, "is_replaying: {}, ", self.is_replaying)?;
603 write!(
604 f,
605 "jobs: {})",
606 self.jobs
607 .iter()
608 .map(ToString::to_string)
609 .collect::<Vec<_>>()
610 .as_slice()
611 .join(", ")
612 )
613 }
614 }
615
616 impl Display for WorkflowActivationJob {
617 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
618 match &self.variant {
619 None => write!(f, "empty"),
620 Some(v) => write!(f, "{v}"),
621 }
622 }
623 }
624
625 impl Display for workflow_activation_job::Variant {
626 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
627 match self {
628 workflow_activation_job::Variant::InitializeWorkflow(_) => {
629 write!(f, "InitializeWorkflow")
630 }
631 workflow_activation_job::Variant::FireTimer(t) => {
632 write!(f, "FireTimer({})", t.seq)
633 }
634 workflow_activation_job::Variant::UpdateRandomSeed(_) => {
635 write!(f, "UpdateRandomSeed")
636 }
637 workflow_activation_job::Variant::QueryWorkflow(_) => {
638 write!(f, "QueryWorkflow")
639 }
640 workflow_activation_job::Variant::CancelWorkflow(_) => {
641 write!(f, "CancelWorkflow")
642 }
643 workflow_activation_job::Variant::SignalWorkflow(_) => {
644 write!(f, "SignalWorkflow")
645 }
646 workflow_activation_job::Variant::ResolveActivity(r) => {
647 write!(
648 f,
649 "ResolveActivity({}, {})",
650 r.seq,
651 r.result
652 .as_ref()
653 .unwrap_or(&ActivityResolution { status: None })
654 )
655 }
656 workflow_activation_job::Variant::NotifyHasPatch(_) => {
657 write!(f, "NotifyHasPatch")
658 }
659 workflow_activation_job::Variant::ResolveChildWorkflowExecutionStart(_) => {
660 write!(f, "ResolveChildWorkflowExecutionStart")
661 }
662 workflow_activation_job::Variant::ResolveChildWorkflowExecution(_) => {
663 write!(f, "ResolveChildWorkflowExecution")
664 }
665 workflow_activation_job::Variant::ResolveSignalExternalWorkflow(_) => {
666 write!(f, "ResolveSignalExternalWorkflow")
667 }
668 workflow_activation_job::Variant::RemoveFromCache(_) => {
669 write!(f, "RemoveFromCache")
670 }
671 workflow_activation_job::Variant::ResolveRequestCancelExternalWorkflow(_) => {
672 write!(f, "ResolveRequestCancelExternalWorkflow")
673 }
674 workflow_activation_job::Variant::DoUpdate(u) => {
675 write!(f, "DoUpdate({})", u.id)
676 }
677 workflow_activation_job::Variant::ResolveNexusOperationStart(_) => {
678 write!(f, "ResolveNexusOperationStart")
679 }
680 workflow_activation_job::Variant::ResolveNexusOperation(_) => {
681 write!(f, "ResolveNexusOperation")
682 }
683 }
684 }
685 }
686
687 impl Display for ActivityResolution {
688 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
689 match self.status {
690 None => {
691 write!(f, "None")
692 }
693 Some(activity_resolution::Status::Failed(_)) => {
694 write!(f, "Failed")
695 }
696 Some(activity_resolution::Status::Completed(_)) => {
697 write!(f, "Completed")
698 }
699 Some(activity_resolution::Status::Cancelled(_)) => {
700 write!(f, "Cancelled")
701 }
702 Some(activity_resolution::Status::Backoff(_)) => {
703 write!(f, "Backoff")
704 }
705 }
706 }
707 }
708
709 impl Display for QueryWorkflow {
710 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
711 write!(
712 f,
713 "QueryWorkflow(id: {}, type: {})",
714 self.query_id, self.query_type
715 )
716 }
717 }
718
719 impl From<WorkflowExecutionSignaledEventAttributes> for SignalWorkflow {
720 fn from(a: WorkflowExecutionSignaledEventAttributes) -> Self {
721 Self {
722 signal_name: a.signal_name,
723 input: Vec::from_payloads(a.input),
724 identity: a.identity,
725 headers: a.header.map(Into::into).unwrap_or_default(),
726 }
727 }
728 }
729
730 impl From<WorkflowExecutionCancelRequestedEventAttributes> for CancelWorkflow {
731 fn from(a: WorkflowExecutionCancelRequestedEventAttributes) -> Self {
732 Self { reason: a.cause }
733 }
734 }
735
736 pub fn start_workflow_from_attribs(
738 attrs: WorkflowExecutionStartedEventAttributes,
739 workflow_id: String,
740 randomness_seed: u64,
741 start_time: Timestamp,
742 ) -> InitializeWorkflow {
743 InitializeWorkflow {
744 workflow_type: attrs.workflow_type.map(|wt| wt.name).unwrap_or_default(),
745 workflow_id,
746 arguments: Vec::from_payloads(attrs.input),
747 randomness_seed,
748 headers: attrs.header.unwrap_or_default().fields,
749 identity: attrs.identity,
750 parent_workflow_info: attrs.parent_workflow_execution.map(|pe| {
751 NamespacedWorkflowExecution {
752 namespace: attrs.parent_workflow_namespace,
753 run_id: pe.run_id,
754 workflow_id: pe.workflow_id,
755 }
756 }),
757 workflow_execution_timeout: attrs.workflow_execution_timeout,
758 workflow_run_timeout: attrs.workflow_run_timeout,
759 workflow_task_timeout: attrs.workflow_task_timeout,
760 continued_from_execution_run_id: attrs.continued_execution_run_id,
761 continued_initiator: attrs.initiator,
762 continued_failure: attrs.continued_failure,
763 last_completion_result: attrs.last_completion_result,
764 first_execution_run_id: attrs.first_execution_run_id,
765 retry_policy: attrs.retry_policy.map(fix_retry_policy),
766 attempt: attrs.attempt,
767 cron_schedule: attrs.cron_schedule,
768 workflow_execution_expiration_time: attrs.workflow_execution_expiration_time,
769 cron_schedule_to_schedule_interval: attrs.first_workflow_task_backoff,
770 memo: attrs.memo,
771 search_attributes: attrs.search_attributes,
772 start_time: Some(start_time),
773 root_workflow: attrs.root_workflow_execution,
774 priority: attrs.priority,
775 }
776 }
777 }
778
779 pub mod workflow_completion {
780 use crate::protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure};
781 tonic::include_proto!("coresdk.workflow_completion");
782
783 impl workflow_activation_completion::Status {
784 pub const fn is_success(&self) -> bool {
785 match &self {
786 Self::Successful(_) => true,
787 Self::Failed(_) => false,
788 }
789 }
790 }
791
792 impl From<failure::v1::Failure> for Failure {
793 fn from(f: failure::v1::Failure) -> Self {
794 Failure {
795 failure: Some(f),
796 force_cause: WorkflowTaskFailedCause::Unspecified as i32,
797 }
798 }
799 }
800 }
801
802 pub mod child_workflow {
803 tonic::include_proto!("coresdk.child_workflow");
804 }
805
806 pub mod nexus {
807 use crate::protos::temporal::api::workflowservice::v1::PollNexusTaskQueueResponse;
808 use std::fmt::{Display, Formatter};
809
810 tonic::include_proto!("coresdk.nexus");
811
812 impl NexusTask {
813 pub fn unwrap_task(self) -> PollNexusTaskQueueResponse {
815 if let Some(nexus_task::Variant::Task(t)) = self.variant {
816 return t;
817 }
818 panic!("Nexus task did not contain a server task");
819 }
820
821 pub fn task_token(&self) -> &[u8] {
823 match &self.variant {
824 Some(nexus_task::Variant::Task(t)) => t.task_token.as_slice(),
825 Some(nexus_task::Variant::CancelTask(c)) => c.task_token.as_slice(),
826 None => panic!("Nexus task did not contain a task token"),
827 }
828 }
829 }
830
831 impl Display for nexus_task_completion::Status {
832 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
833 write!(f, "NexusTaskCompletion(")?;
834 match self {
835 nexus_task_completion::Status::Completed(c) => {
836 write!(f, "{c}")
837 }
838 nexus_task_completion::Status::AckCancel(_) => {
839 write!(f, "AckCancel")
840 }
841 #[allow(deprecated)]
842 nexus_task_completion::Status::Error(error) => {
843 write!(f, "Error({error:?})")
844 }
845 nexus_task_completion::Status::Failure(failure) => {
846 write!(f, "{failure}")
847 }
848 }?;
849 write!(f, ")")
850 }
851 }
852
853 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
854 pub enum NexusOperationErrorState {
855 Failed,
856 Canceled,
857 }
858
859 impl Display for NexusOperationErrorState {
860 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
861 match self {
862 Self::Failed => write!(f, "failed"),
863 Self::Canceled => write!(f, "canceled"),
864 }
865 }
866 }
867 }
868
869 pub mod workflow_commands {
870 tonic::include_proto!("coresdk.workflow_commands");
871
872 use crate::protos::temporal::api::{common::v1::Payloads, enums::v1::QueryResultType};
873 use std::fmt::{Display, Formatter};
874
875 impl Display for WorkflowCommand {
876 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
877 match &self.variant {
878 None => write!(f, "Empty"),
879 Some(v) => write!(f, "{v}"),
880 }
881 }
882 }
883
884 impl Display for StartTimer {
885 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
886 write!(f, "StartTimer({})", self.seq)
887 }
888 }
889
890 impl Display for ScheduleActivity {
891 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
892 write!(f, "ScheduleActivity({}, {})", self.seq, self.activity_type)
893 }
894 }
895
896 impl Display for ScheduleLocalActivity {
897 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898 write!(
899 f,
900 "ScheduleLocalActivity({}, {})",
901 self.seq, self.activity_type
902 )
903 }
904 }
905
906 impl Display for QueryResult {
907 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
908 write!(f, "RespondToQuery({})", self.query_id)
909 }
910 }
911
912 impl Display for RequestCancelActivity {
913 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
914 write!(f, "RequestCancelActivity({})", self.seq)
915 }
916 }
917
918 impl Display for RequestCancelLocalActivity {
919 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
920 write!(f, "RequestCancelLocalActivity({})", self.seq)
921 }
922 }
923
924 impl Display for CancelTimer {
925 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
926 write!(f, "CancelTimer({})", self.seq)
927 }
928 }
929
930 impl Display for CompleteWorkflowExecution {
931 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
932 write!(f, "CompleteWorkflowExecution")
933 }
934 }
935
936 impl Display for FailWorkflowExecution {
937 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
938 write!(f, "FailWorkflowExecution")
939 }
940 }
941
942 impl Display for ContinueAsNewWorkflowExecution {
943 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
944 write!(f, "ContinueAsNewWorkflowExecution")
945 }
946 }
947
948 impl Display for CancelWorkflowExecution {
949 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
950 write!(f, "CancelWorkflowExecution")
951 }
952 }
953
954 impl Display for SetPatchMarker {
955 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
956 write!(f, "SetPatchMarker({})", self.patch_id)
957 }
958 }
959
960 impl Display for StartChildWorkflowExecution {
961 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
962 write!(
963 f,
964 "StartChildWorkflowExecution({}, {})",
965 self.seq, self.workflow_type
966 )
967 }
968 }
969
970 impl Display for RequestCancelExternalWorkflowExecution {
971 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
972 write!(f, "RequestCancelExternalWorkflowExecution({})", self.seq)
973 }
974 }
975
976 impl Display for UpsertWorkflowSearchAttributes {
977 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
978 let keys: Vec<_> = self
979 .search_attributes
980 .as_ref()
981 .map(|sa| sa.indexed_fields.keys().collect())
982 .unwrap_or_default();
983 write!(f, "UpsertWorkflowSearchAttributes({:?})", keys)
984 }
985 }
986
987 impl Display for SignalExternalWorkflowExecution {
988 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
989 write!(f, "SignalExternalWorkflowExecution({})", self.seq)
990 }
991 }
992
993 impl Display for CancelSignalWorkflow {
994 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
995 write!(f, "CancelSignalWorkflow({})", self.seq)
996 }
997 }
998
999 impl Display for CancelChildWorkflowExecution {
1000 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1001 write!(
1002 f,
1003 "CancelChildWorkflowExecution({})",
1004 self.child_workflow_seq
1005 )
1006 }
1007 }
1008
1009 impl Display for ModifyWorkflowProperties {
1010 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1011 write!(
1012 f,
1013 "ModifyWorkflowProperties(upserted memo keys: {:?})",
1014 self.upserted_memo.as_ref().map(|m| m.fields.keys())
1015 )
1016 }
1017 }
1018
1019 impl Display for UpdateResponse {
1020 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1021 write!(
1022 f,
1023 "UpdateResponse(protocol_instance_id: {}, response: {:?})",
1024 self.protocol_instance_id, self.response
1025 )
1026 }
1027 }
1028
1029 impl Display for ScheduleNexusOperation {
1030 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1031 write!(f, "ScheduleNexusOperation({})", self.seq)
1032 }
1033 }
1034
1035 impl Display for RequestCancelNexusOperation {
1036 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1037 write!(f, "RequestCancelNexusOperation({})", self.seq)
1038 }
1039 }
1040
1041 impl QueryResult {
1042 pub fn into_components(self) -> (String, QueryResultType, Option<Payloads>, String) {
1044 match self {
1045 QueryResult {
1046 variant: Some(query_result::Variant::Succeeded(qs)),
1047 query_id,
1048 } => (
1049 query_id,
1050 QueryResultType::Answered,
1051 qs.response.map(Into::into),
1052 "".to_string(),
1053 ),
1054 QueryResult {
1055 variant: Some(query_result::Variant::Failed(err)),
1056 query_id,
1057 } => (query_id, QueryResultType::Failed, None, err.message),
1058 QueryResult {
1059 variant: None,
1060 query_id,
1061 } => (
1062 query_id,
1063 QueryResultType::Failed,
1064 None,
1065 "Query response was empty".to_string(),
1066 ),
1067 }
1068 }
1069 }
1070 }
1071
1072 pub type HistoryEventId = i64;
1073
1074 impl From<workflow_activation_job::Variant> for WorkflowActivationJob {
1075 fn from(a: workflow_activation_job::Variant) -> Self {
1076 Self { variant: Some(a) }
1077 }
1078 }
1079
1080 impl From<Vec<WorkflowCommand>> for workflow_completion::Success {
1081 fn from(v: Vec<WorkflowCommand>) -> Self {
1082 Self {
1083 commands: v,
1084 used_internal_flags: vec![],
1085 versioning_behavior: VersioningBehavior::Unspecified.into(),
1086 }
1087 }
1088 }
1089
1090 impl From<workflow_command::Variant> for WorkflowCommand {
1091 fn from(v: workflow_command::Variant) -> Self {
1092 Self {
1093 variant: Some(v),
1094 user_metadata: None,
1095 }
1096 }
1097 }
1098
1099 impl workflow_completion::Success {
1100 pub fn from_variants(cmds: Vec<Variant>) -> Self {
1101 let cmds: Vec<_> = cmds.into_iter().map(|c| c.into()).collect();
1102 cmds.into()
1103 }
1104 }
1105
1106 impl WorkflowActivationCompletion {
1107 pub fn empty(run_id: impl Into<String>) -> Self {
1109 let success = workflow_completion::Success::from_variants(vec![]);
1110 Self {
1111 run_id: run_id.into(),
1112 status: Some(workflow_activation_completion::Status::Successful(success)),
1113 }
1114 }
1115
1116 pub fn from_cmds(run_id: impl Into<String>, cmds: Vec<workflow_command::Variant>) -> Self {
1118 let success = workflow_completion::Success::from_variants(cmds);
1119 Self {
1120 run_id: run_id.into(),
1121 status: Some(workflow_activation_completion::Status::Successful(success)),
1122 }
1123 }
1124
1125 pub fn from_cmd(run_id: impl Into<String>, cmd: workflow_command::Variant) -> Self {
1127 let success = workflow_completion::Success::from_variants(vec![cmd]);
1128 Self {
1129 run_id: run_id.into(),
1130 status: Some(workflow_activation_completion::Status::Successful(success)),
1131 }
1132 }
1133
1134 pub fn fail(
1135 run_id: impl Into<String>,
1136 failure: Failure,
1137 cause: Option<WorkflowTaskFailedCause>,
1138 ) -> Self {
1139 Self {
1140 run_id: run_id.into(),
1141 status: Some(workflow_activation_completion::Status::Failed(
1142 workflow_completion::Failure {
1143 failure: Some(failure),
1144 force_cause: cause.unwrap_or(WorkflowTaskFailedCause::Unspecified) as i32,
1145 },
1146 )),
1147 }
1148 }
1149
1150 pub fn has_execution_ending(&self) -> bool {
1153 self.has_complete_workflow_execution()
1154 || self.has_fail_execution()
1155 || self.has_continue_as_new()
1156 || self.has_cancel_workflow_execution()
1157 }
1158
1159 pub fn has_fail_execution(&self) -> bool {
1161 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1162 return s.commands.iter().any(|wfc| {
1163 matches!(
1164 wfc,
1165 WorkflowCommand {
1166 variant: Some(workflow_command::Variant::FailWorkflowExecution(_)),
1167 ..
1168 }
1169 )
1170 });
1171 }
1172 false
1173 }
1174
1175 pub fn has_cancel_workflow_execution(&self) -> bool {
1177 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1178 return s.commands.iter().any(|wfc| {
1179 matches!(
1180 wfc,
1181 WorkflowCommand {
1182 variant: Some(workflow_command::Variant::CancelWorkflowExecution(_)),
1183 ..
1184 }
1185 )
1186 });
1187 }
1188 false
1189 }
1190
1191 pub fn has_continue_as_new(&self) -> bool {
1193 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1194 return s.commands.iter().any(|wfc| {
1195 matches!(
1196 wfc,
1197 WorkflowCommand {
1198 variant: Some(
1199 workflow_command::Variant::ContinueAsNewWorkflowExecution(_)
1200 ),
1201 ..
1202 }
1203 )
1204 });
1205 }
1206 false
1207 }
1208
1209 pub fn has_complete_workflow_execution(&self) -> bool {
1211 self.complete_workflow_execution_value().is_some()
1212 }
1213
1214 pub fn complete_workflow_execution_value(&self) -> Option<&Payload> {
1216 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1217 s.commands.iter().find_map(|wfc| match wfc {
1218 WorkflowCommand {
1219 variant: Some(workflow_command::Variant::CompleteWorkflowExecution(v)),
1220 ..
1221 } => v.result.as_ref(),
1222 _ => None,
1223 })
1224 } else {
1225 None
1226 }
1227 }
1228
1229 pub fn is_empty(&self) -> bool {
1231 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1232 return s.commands.is_empty();
1233 }
1234 false
1235 }
1236
1237 pub fn add_internal_flags(&mut self, patch: u32) {
1238 if let Some(workflow_activation_completion::Status::Successful(s)) = &mut self.status {
1239 s.used_internal_flags.push(patch);
1240 }
1241 }
1242 }
1243
1244 pub trait IntoCompletion {
1246 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion;
1248 }
1249
1250 impl IntoCompletion for workflow_command::Variant {
1251 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1252 WorkflowActivationCompletion::from_cmd(run_id, self)
1253 }
1254 }
1255
1256 impl<I, V> IntoCompletion for I
1257 where
1258 I: IntoIterator<Item = V>,
1259 V: Into<WorkflowCommand>,
1260 {
1261 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1262 let success = self.into_iter().map(Into::into).collect::<Vec<_>>().into();
1263 WorkflowActivationCompletion {
1264 run_id,
1265 status: Some(workflow_activation_completion::Status::Successful(success)),
1266 }
1267 }
1268 }
1269
1270 impl Display for WorkflowActivationCompletion {
1271 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1272 write!(
1273 f,
1274 "WorkflowActivationCompletion(run_id: {}, status: ",
1275 &self.run_id
1276 )?;
1277 match &self.status {
1278 None => write!(f, "empty")?,
1279 Some(s) => write!(f, "{s}")?,
1280 };
1281 write!(f, ")")
1282 }
1283 }
1284
1285 impl Display for workflow_activation_completion::Status {
1286 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1287 match self {
1288 workflow_activation_completion::Status::Successful(
1289 workflow_completion::Success { commands, .. },
1290 ) => {
1291 write!(f, "Success(")?;
1292 let mut written = 0;
1293 for c in commands {
1294 write!(f, "{c} ")?;
1295 written += 1;
1296 if written >= 10 && written < commands.len() {
1297 write!(f, "... {} more", commands.len() - written)?;
1298 break;
1299 }
1300 }
1301 write!(f, ")")
1302 }
1303 workflow_activation_completion::Status::Failed(_) => {
1304 write!(f, "Failed")
1305 }
1306 }
1307 }
1308 }
1309
1310 impl ActivityTask {
1311 pub fn start_from_poll_resp(r: PollActivityTaskQueueResponse) -> Self {
1312 let (workflow_id, run_id) = r
1313 .workflow_execution
1314 .map(|we| (we.workflow_id, we.run_id))
1315 .unwrap_or_default();
1316 Self {
1317 task_token: r.task_token,
1318 variant: Some(activity_task::activity_task::Variant::Start(
1319 activity_task::Start {
1320 workflow_namespace: r.workflow_namespace,
1321 workflow_type: r.workflow_type.map_or_else(|| "".to_string(), |wt| wt.name),
1322 workflow_execution: Some(WorkflowExecution {
1323 workflow_id,
1324 run_id,
1325 }),
1326 activity_id: r.activity_id,
1327 activity_type: r.activity_type.map_or_else(|| "".to_string(), |at| at.name),
1328 header_fields: r.header.map(Into::into).unwrap_or_default(),
1329 input: Vec::from_payloads(r.input),
1330 heartbeat_details: Vec::from_payloads(r.heartbeat_details),
1331 scheduled_time: r.scheduled_time,
1332 current_attempt_scheduled_time: r.current_attempt_scheduled_time,
1333 started_time: r.started_time,
1334 attempt: r.attempt as u32,
1335 schedule_to_close_timeout: r.schedule_to_close_timeout,
1336 start_to_close_timeout: r.start_to_close_timeout,
1337 heartbeat_timeout: r.heartbeat_timeout,
1338 retry_policy: r.retry_policy.map(fix_retry_policy),
1339 priority: r.priority,
1340 is_local: false,
1341 run_id: r.activity_run_id,
1342 },
1343 )),
1344 }
1345 }
1346 }
1347
1348 impl Failure {
1349 pub fn is_timeout(&self) -> Option<crate::protos::temporal::api::enums::v1::TimeoutType> {
1350 match &self.failure_info {
1351 Some(FailureInfo::TimeoutFailureInfo(ti)) => Some(ti.timeout_type()),
1352 _ => {
1353 if let Some(c) = &self.cause {
1354 c.is_timeout()
1355 } else {
1356 None
1357 }
1358 }
1359 }
1360 }
1361
1362 pub fn application_failure(message: String, non_retryable: bool) -> Self {
1363 Self {
1364 message,
1365 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1366 ApplicationFailureInfo {
1367 non_retryable,
1368 ..Default::default()
1369 },
1370 )),
1371 ..Default::default()
1372 }
1373 }
1374
1375 pub fn application_failure_from_error(ae: anyhow::Error, non_retryable: bool) -> Self {
1376 Self {
1377 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1378 ApplicationFailureInfo {
1379 non_retryable,
1380 ..Default::default()
1381 },
1382 )),
1383 ..ae.chain()
1384 .rfold(None, |cause, e| {
1385 Some(Self {
1386 message: e.to_string(),
1387 cause: cause.map(Box::new),
1388 ..Default::default()
1389 })
1390 })
1391 .unwrap_or_default()
1392 }
1393 }
1394
1395 pub fn timeout(timeout_type: TimeoutType) -> Self {
1396 Self {
1397 message: "Activity timed out".to_string(),
1398 cause: Some(Box::new(Failure {
1399 message: "Activity timed out".to_string(),
1400 failure_info: Some(FailureInfo::TimeoutFailureInfo(TimeoutFailureInfo {
1401 timeout_type: timeout_type.into(),
1402 ..Default::default()
1403 })),
1404 ..Default::default()
1405 })),
1406 failure_info: Some(FailureInfo::ActivityFailureInfo(
1407 ActivityFailureInfo::default(),
1408 )),
1409 ..Default::default()
1410 }
1411 }
1412
1413 pub fn maybe_application_failure(&self) -> Option<&ApplicationFailureInfo> {
1415 if let Failure {
1416 failure_info: Some(FailureInfo::ApplicationFailureInfo(f)),
1417 ..
1418 } = self
1419 {
1420 Some(f)
1421 } else {
1422 None
1423 }
1424 }
1425
1426 pub fn is_benign_application_failure(&self) -> bool {
1428 self.maybe_application_failure()
1429 .is_some_and(|app_info| app_info.category() == ApplicationErrorCategory::Benign)
1430 }
1431 }
1432
1433 impl Display for Failure {
1434 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1435 write!(f, "Failure({}, ", self.message)?;
1436 match self.failure_info.as_ref() {
1437 None => write!(f, "missing info")?,
1438 Some(FailureInfo::TimeoutFailureInfo(v)) => {
1439 write!(f, "Timeout: {:?}", v.timeout_type())?;
1440 }
1441 Some(FailureInfo::ApplicationFailureInfo(v)) => {
1442 write!(f, "Application Failure: {}", v.r#type)?;
1443 }
1444 Some(FailureInfo::CanceledFailureInfo(_)) => {
1445 write!(f, "Cancelled")?;
1446 }
1447 Some(FailureInfo::TerminatedFailureInfo(_)) => {
1448 write!(f, "Terminated")?;
1449 }
1450 Some(FailureInfo::ServerFailureInfo(_)) => {
1451 write!(f, "Server Failure")?;
1452 }
1453 Some(FailureInfo::ResetWorkflowFailureInfo(_)) => {
1454 write!(f, "Reset Workflow")?;
1455 }
1456 Some(FailureInfo::ActivityFailureInfo(v)) => {
1457 write!(
1458 f,
1459 "Activity Failure: scheduled_event_id: {}",
1460 v.scheduled_event_id
1461 )?;
1462 }
1463 Some(FailureInfo::ChildWorkflowExecutionFailureInfo(v)) => {
1464 write!(
1465 f,
1466 "Child Workflow: started_event_id: {}",
1467 v.started_event_id
1468 )?;
1469 }
1470 Some(FailureInfo::NexusOperationExecutionFailureInfo(v)) => {
1471 write!(
1472 f,
1473 "Nexus Operation Failure: scheduled_event_id: {}",
1474 v.scheduled_event_id
1475 )?;
1476 }
1477 Some(FailureInfo::NexusHandlerFailureInfo(v)) => {
1478 write!(f, "Nexus Handler Failure: {}", v.r#type)?;
1479 }
1480 }
1481 write!(f, ")")
1482 }
1483 }
1484
1485 impl From<&str> for Failure {
1486 fn from(v: &str) -> Self {
1487 Failure::application_failure(v.to_string(), false)
1488 }
1489 }
1490
1491 impl From<String> for Failure {
1492 fn from(v: String) -> Self {
1493 Failure::application_failure(v, false)
1494 }
1495 }
1496
1497 impl From<anyhow::Error> for Failure {
1498 fn from(ae: anyhow::Error) -> Self {
1499 Failure::application_failure_from_error(ae, false)
1500 }
1501 }
1502
1503 pub trait FromPayloadsExt {
1504 fn from_payloads(p: Option<Payloads>) -> Self;
1505 }
1506 impl<T> FromPayloadsExt for T
1507 where
1508 T: FromIterator<Payload>,
1509 {
1510 fn from_payloads(p: Option<Payloads>) -> Self {
1511 match p {
1512 None => std::iter::empty().collect(),
1513 Some(p) => p.payloads.into_iter().collect(),
1514 }
1515 }
1516 }
1517
1518 pub trait IntoPayloadsExt {
1519 fn into_payloads(self) -> Option<Payloads>;
1520 }
1521 impl<T> IntoPayloadsExt for T
1522 where
1523 T: IntoIterator<Item = Payload>,
1524 {
1525 fn into_payloads(self) -> Option<Payloads> {
1526 let mut iterd = self.into_iter().peekable();
1527 if iterd.peek().is_none() {
1528 None
1529 } else {
1530 Some(Payloads {
1531 payloads: iterd.collect(),
1532 })
1533 }
1534 }
1535 }
1536
1537 impl From<Payload> for Payloads {
1538 fn from(p: Payload) -> Self {
1539 Self { payloads: vec![p] }
1540 }
1541 }
1542
1543 impl<T> From<T> for Payloads
1544 where
1545 T: AsRef<[u8]>,
1546 {
1547 fn from(v: T) -> Self {
1548 Self {
1549 payloads: vec![v.into()],
1550 }
1551 }
1552 }
1553
1554 #[derive(thiserror::Error, Debug)]
1555 pub enum PayloadDeserializeErr {
1556 #[error("This deserializer does not understand this payload")]
1559 DeserializerDoesNotHandle,
1560 #[error("Error during deserialization: {0}")]
1561 DeserializeErr(#[from] anyhow::Error),
1562 }
1563
1564 pub trait AsJsonPayloadExt {
1567 fn as_json_payload(&self) -> anyhow::Result<Payload>;
1568 }
1569 impl<T> AsJsonPayloadExt for T
1570 where
1571 T: Serialize,
1572 {
1573 fn as_json_payload(&self) -> anyhow::Result<Payload> {
1574 let as_json = serde_json::to_string(self)?;
1575 let mut metadata = HashMap::new();
1576 metadata.insert(
1577 ENCODING_PAYLOAD_KEY.to_string(),
1578 JSON_ENCODING_VAL.as_bytes().to_vec(),
1579 );
1580 Ok(Payload {
1581 metadata,
1582 data: as_json.into_bytes(),
1583 external_payloads: Default::default(),
1584 })
1585 }
1586 }
1587
1588 pub trait FromJsonPayloadExt: Sized {
1589 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr>;
1590 }
1591 impl<T> FromJsonPayloadExt for T
1592 where
1593 T: for<'de> Deserialize<'de>,
1594 {
1595 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr> {
1596 if !payload.is_json_payload() {
1597 return Err(PayloadDeserializeErr::DeserializerDoesNotHandle);
1598 }
1599 let payload_str = std::str::from_utf8(&payload.data).map_err(anyhow::Error::from)?;
1600 Ok(serde_json::from_str(payload_str).map_err(anyhow::Error::from)?)
1601 }
1602 }
1603
1604 #[derive(derive_more::Display, Debug)]
1606 pub enum PayloadsToPayloadError {
1607 MoreThanOnePayload,
1608 NoPayload,
1609 }
1610 impl TryFrom<Payloads> for Payload {
1611 type Error = PayloadsToPayloadError;
1612
1613 fn try_from(mut v: Payloads) -> Result<Self, Self::Error> {
1614 match v.payloads.pop() {
1615 None => Err(PayloadsToPayloadError::NoPayload),
1616 Some(p) => {
1617 if v.payloads.is_empty() {
1618 Ok(p)
1619 } else {
1620 Err(PayloadsToPayloadError::MoreThanOnePayload)
1621 }
1622 }
1623 }
1624 }
1625 }
1626
1627 fn fix_retry_policy(mut retry_policy: RetryPolicy) -> RetryPolicy {
1630 if retry_policy.initial_interval.is_none() {
1631 retry_policy.initial_interval = Default::default();
1632 }
1633 retry_policy
1634 }
1635}
1636
1637#[allow(
1639 clippy::all,
1640 missing_docs,
1641 rustdoc::broken_intra_doc_links,
1642 rustdoc::bare_urls
1643)]
1644pub mod temporal {
1646 pub mod api {
1647 pub mod activity {
1648 pub mod v1 {
1649 tonic::include_proto!("temporal.api.activity.v1");
1650 }
1651 }
1652 pub mod batch {
1653 pub mod v1 {
1654 tonic::include_proto!("temporal.api.batch.v1");
1655 }
1656 }
1657 pub mod command {
1658 pub mod v1 {
1659 tonic::include_proto!("temporal.api.command.v1");
1660
1661 use crate::protos::{
1662 coresdk::{IntoPayloadsExt, workflow_commands},
1663 temporal::api::{
1664 common::v1::{ActivityType, WorkflowType},
1665 enums::v1::CommandType,
1666 },
1667 };
1668 use command::Attributes;
1669 use std::fmt::{Display, Formatter};
1670
1671 impl From<command::Attributes> for Command {
1672 fn from(c: command::Attributes) -> Self {
1673 match c {
1674 a @ Attributes::StartTimerCommandAttributes(_) => Self {
1675 command_type: CommandType::StartTimer as i32,
1676 attributes: Some(a),
1677 user_metadata: Default::default(),
1678 },
1679 a @ Attributes::CancelTimerCommandAttributes(_) => Self {
1680 command_type: CommandType::CancelTimer as i32,
1681 attributes: Some(a),
1682 user_metadata: Default::default(),
1683 },
1684 a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => Self {
1685 command_type: CommandType::CompleteWorkflowExecution as i32,
1686 attributes: Some(a),
1687 user_metadata: Default::default(),
1688 },
1689 a @ Attributes::FailWorkflowExecutionCommandAttributes(_) => Self {
1690 command_type: CommandType::FailWorkflowExecution as i32,
1691 attributes: Some(a),
1692 user_metadata: Default::default(),
1693 },
1694 a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self {
1695 command_type: CommandType::ScheduleActivityTask as i32,
1696 attributes: Some(a),
1697 user_metadata: Default::default(),
1698 },
1699 a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => Self {
1700 command_type: CommandType::RequestCancelActivityTask as i32,
1701 attributes: Some(a),
1702 user_metadata: Default::default(),
1703 },
1704 a @ Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1705 Self {
1706 command_type: CommandType::ContinueAsNewWorkflowExecution
1707 as i32,
1708 attributes: Some(a),
1709 user_metadata: Default::default(),
1710 }
1711 }
1712 a @ Attributes::CancelWorkflowExecutionCommandAttributes(_) => Self {
1713 command_type: CommandType::CancelWorkflowExecution as i32,
1714 attributes: Some(a),
1715 user_metadata: Default::default(),
1716 },
1717 a @ Attributes::RecordMarkerCommandAttributes(_) => Self {
1718 command_type: CommandType::RecordMarker as i32,
1719 attributes: Some(a),
1720 user_metadata: Default::default(),
1721 },
1722 a @ Attributes::ProtocolMessageCommandAttributes(_) => Self {
1723 command_type: CommandType::ProtocolMessage as i32,
1724 attributes: Some(a),
1725 user_metadata: Default::default(),
1726 },
1727 a @ Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1728 Self {
1729 command_type: CommandType::RequestCancelNexusOperation as i32,
1730 attributes: Some(a),
1731 user_metadata: Default::default(),
1732 }
1733 }
1734 _ => unimplemented!(),
1735 }
1736 }
1737 }
1738
1739 impl Display for Command {
1740 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1741 let ct = CommandType::try_from(self.command_type)
1742 .unwrap_or(CommandType::Unspecified);
1743 write!(f, "{:?}", ct)
1744 }
1745 }
1746
1747 pub trait CommandAttributesExt {
1748 fn as_type(&self) -> CommandType;
1749 }
1750
1751 impl CommandAttributesExt for command::Attributes {
1752 fn as_type(&self) -> CommandType {
1753 match self {
1754 Attributes::ScheduleActivityTaskCommandAttributes(_) => {
1755 CommandType::ScheduleActivityTask
1756 }
1757 Attributes::StartTimerCommandAttributes(_) => CommandType::StartTimer,
1758 Attributes::CompleteWorkflowExecutionCommandAttributes(_) => {
1759 CommandType::CompleteWorkflowExecution
1760 }
1761 Attributes::FailWorkflowExecutionCommandAttributes(_) => {
1762 CommandType::FailWorkflowExecution
1763 }
1764 Attributes::RequestCancelActivityTaskCommandAttributes(_) => {
1765 CommandType::RequestCancelActivityTask
1766 }
1767 Attributes::CancelTimerCommandAttributes(_) => CommandType::CancelTimer,
1768 Attributes::CancelWorkflowExecutionCommandAttributes(_) => {
1769 CommandType::CancelWorkflowExecution
1770 }
1771 Attributes::RequestCancelExternalWorkflowExecutionCommandAttributes(
1772 _,
1773 ) => CommandType::RequestCancelExternalWorkflowExecution,
1774 Attributes::RecordMarkerCommandAttributes(_) => {
1775 CommandType::RecordMarker
1776 }
1777 Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1778 CommandType::ContinueAsNewWorkflowExecution
1779 }
1780 Attributes::StartChildWorkflowExecutionCommandAttributes(_) => {
1781 CommandType::StartChildWorkflowExecution
1782 }
1783 Attributes::SignalExternalWorkflowExecutionCommandAttributes(_) => {
1784 CommandType::SignalExternalWorkflowExecution
1785 }
1786 Attributes::UpsertWorkflowSearchAttributesCommandAttributes(_) => {
1787 CommandType::UpsertWorkflowSearchAttributes
1788 }
1789 Attributes::ProtocolMessageCommandAttributes(_) => {
1790 CommandType::ProtocolMessage
1791 }
1792 Attributes::ModifyWorkflowPropertiesCommandAttributes(_) => {
1793 CommandType::ModifyWorkflowProperties
1794 }
1795 Attributes::ScheduleNexusOperationCommandAttributes(_) => {
1796 CommandType::ScheduleNexusOperation
1797 }
1798 Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1799 CommandType::RequestCancelNexusOperation
1800 }
1801 }
1802 }
1803 }
1804
1805 impl Display for command::Attributes {
1806 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1807 write!(f, "{:?}", self.as_type())
1808 }
1809 }
1810
1811 impl From<workflow_commands::StartTimer> for command::Attributes {
1812 fn from(s: workflow_commands::StartTimer) -> Self {
1813 Self::StartTimerCommandAttributes(StartTimerCommandAttributes {
1814 timer_id: s.seq.to_string(),
1815 start_to_fire_timeout: s.start_to_fire_timeout,
1816 })
1817 }
1818 }
1819
1820 impl From<workflow_commands::UpsertWorkflowSearchAttributes> for command::Attributes {
1821 fn from(s: workflow_commands::UpsertWorkflowSearchAttributes) -> Self {
1822 Self::UpsertWorkflowSearchAttributesCommandAttributes(
1823 UpsertWorkflowSearchAttributesCommandAttributes {
1824 search_attributes: s.search_attributes,
1825 },
1826 )
1827 }
1828 }
1829
1830 impl From<workflow_commands::ModifyWorkflowProperties> for command::Attributes {
1831 fn from(s: workflow_commands::ModifyWorkflowProperties) -> Self {
1832 Self::ModifyWorkflowPropertiesCommandAttributes(
1833 ModifyWorkflowPropertiesCommandAttributes {
1834 upserted_memo: s.upserted_memo.map(Into::into),
1835 },
1836 )
1837 }
1838 }
1839
1840 impl From<workflow_commands::CancelTimer> for command::Attributes {
1841 fn from(s: workflow_commands::CancelTimer) -> Self {
1842 Self::CancelTimerCommandAttributes(CancelTimerCommandAttributes {
1843 timer_id: s.seq.to_string(),
1844 })
1845 }
1846 }
1847
1848 pub fn schedule_activity_cmd_to_api(
1849 s: workflow_commands::ScheduleActivity,
1850 use_workflow_build_id: bool,
1851 ) -> command::Attributes {
1852 command::Attributes::ScheduleActivityTaskCommandAttributes(
1853 ScheduleActivityTaskCommandAttributes {
1854 activity_id: s.activity_id,
1855 activity_type: Some(ActivityType {
1856 name: s.activity_type,
1857 }),
1858 task_queue: Some(s.task_queue.into()),
1859 header: Some(s.headers.into()),
1860 input: s.arguments.into_payloads(),
1861 schedule_to_close_timeout: s.schedule_to_close_timeout,
1862 schedule_to_start_timeout: s.schedule_to_start_timeout,
1863 start_to_close_timeout: s.start_to_close_timeout,
1864 heartbeat_timeout: s.heartbeat_timeout,
1865 retry_policy: s.retry_policy.map(Into::into),
1866 request_eager_execution: !s.do_not_eagerly_execute,
1867 use_workflow_build_id,
1868 priority: s.priority,
1869 },
1870 )
1871 }
1872
1873 #[allow(deprecated)]
1874 pub fn start_child_workflow_cmd_to_api(
1875 s: workflow_commands::StartChildWorkflowExecution,
1876 inherit_build_id: bool,
1877 ) -> command::Attributes {
1878 command::Attributes::StartChildWorkflowExecutionCommandAttributes(
1879 StartChildWorkflowExecutionCommandAttributes {
1880 workflow_id: s.workflow_id,
1881 workflow_type: Some(WorkflowType {
1882 name: s.workflow_type,
1883 }),
1884 control: "".into(),
1885 namespace: s.namespace,
1886 task_queue: Some(s.task_queue.into()),
1887 header: Some(s.headers.into()),
1888 memo: Some(s.memo.into()),
1889 search_attributes: s.search_attributes,
1890 input: s.input.into_payloads(),
1891 workflow_id_reuse_policy: s.workflow_id_reuse_policy,
1892 workflow_execution_timeout: s.workflow_execution_timeout,
1893 workflow_run_timeout: s.workflow_run_timeout,
1894 workflow_task_timeout: s.workflow_task_timeout,
1895 retry_policy: s.retry_policy.map(Into::into),
1896 cron_schedule: s.cron_schedule.clone(),
1897 parent_close_policy: s.parent_close_policy,
1898 inherit_build_id,
1899 priority: s.priority,
1900 },
1901 )
1902 }
1903
1904 impl From<workflow_commands::CompleteWorkflowExecution> for command::Attributes {
1905 fn from(c: workflow_commands::CompleteWorkflowExecution) -> Self {
1906 Self::CompleteWorkflowExecutionCommandAttributes(
1907 CompleteWorkflowExecutionCommandAttributes {
1908 result: c.result.map(Into::into),
1909 },
1910 )
1911 }
1912 }
1913
1914 impl From<workflow_commands::FailWorkflowExecution> for command::Attributes {
1915 fn from(c: workflow_commands::FailWorkflowExecution) -> Self {
1916 Self::FailWorkflowExecutionCommandAttributes(
1917 FailWorkflowExecutionCommandAttributes {
1918 failure: c.failure.map(Into::into),
1919 },
1920 )
1921 }
1922 }
1923
1924 #[allow(deprecated)]
1925 pub fn continue_as_new_cmd_to_api(
1926 c: workflow_commands::ContinueAsNewWorkflowExecution,
1927 inherit_build_id: bool,
1928 ) -> command::Attributes {
1929 command::Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(
1930 ContinueAsNewWorkflowExecutionCommandAttributes {
1931 workflow_type: Some(c.workflow_type.into()),
1932 task_queue: Some(c.task_queue.into()),
1933 input: c.arguments.into_payloads(),
1934 workflow_run_timeout: c.workflow_run_timeout,
1935 workflow_task_timeout: c.workflow_task_timeout,
1936 memo: if c.memo.is_empty() {
1937 None
1938 } else {
1939 Some(c.memo.into())
1940 },
1941 header: if c.headers.is_empty() {
1942 None
1943 } else {
1944 Some(c.headers.into())
1945 },
1946 retry_policy: c.retry_policy,
1947 search_attributes: c.search_attributes,
1948 inherit_build_id,
1949 initial_versioning_behavior: c.initial_versioning_behavior,
1950 ..Default::default()
1951 },
1952 )
1953 }
1954
1955 impl From<workflow_commands::CancelWorkflowExecution> for command::Attributes {
1956 fn from(_c: workflow_commands::CancelWorkflowExecution) -> Self {
1957 Self::CancelWorkflowExecutionCommandAttributes(
1958 CancelWorkflowExecutionCommandAttributes { details: None },
1959 )
1960 }
1961 }
1962
1963 impl From<workflow_commands::ScheduleNexusOperation> for command::Attributes {
1964 fn from(c: workflow_commands::ScheduleNexusOperation) -> Self {
1965 Self::ScheduleNexusOperationCommandAttributes(
1966 ScheduleNexusOperationCommandAttributes {
1967 endpoint: c.endpoint,
1968 service: c.service,
1969 operation: c.operation,
1970 input: c.input,
1971 schedule_to_close_timeout: c.schedule_to_close_timeout,
1972 schedule_to_start_timeout: c.schedule_to_start_timeout,
1973 start_to_close_timeout: c.start_to_close_timeout,
1974 nexus_header: c.nexus_header,
1975 },
1976 )
1977 }
1978 }
1979 }
1980 }
1981 #[allow(rustdoc::invalid_html_tags)]
1982 pub mod cloud {
1983 pub mod account {
1984 pub mod v1 {
1985 tonic::include_proto!("temporal.api.cloud.account.v1");
1986 }
1987 }
1988 pub mod auditlog {
1989 pub mod v1 {
1990 tonic::include_proto!("temporal.api.cloud.auditlog.v1");
1991 }
1992 }
1993 pub mod billing {
1994 pub mod v1 {
1995 tonic::include_proto!("temporal.api.cloud.billing.v1");
1996 }
1997 }
1998 pub mod cloudservice {
1999 pub mod v1 {
2000 tonic::include_proto!("temporal.api.cloud.cloudservice.v1");
2001 }
2002 }
2003 pub mod connectivityrule {
2004 pub mod v1 {
2005 tonic::include_proto!("temporal.api.cloud.connectivityrule.v1");
2006 }
2007 }
2008 pub mod identity {
2009 pub mod v1 {
2010 tonic::include_proto!("temporal.api.cloud.identity.v1");
2011 }
2012 }
2013 pub mod namespace {
2014 pub mod v1 {
2015 tonic::include_proto!("temporal.api.cloud.namespace.v1");
2016 }
2017 }
2018 pub mod nexus {
2019 pub mod v1 {
2020 tonic::include_proto!("temporal.api.cloud.nexus.v1");
2021 }
2022 }
2023 pub mod operation {
2024 pub mod v1 {
2025 tonic::include_proto!("temporal.api.cloud.operation.v1");
2026 }
2027 }
2028 pub mod region {
2029 pub mod v1 {
2030 tonic::include_proto!("temporal.api.cloud.region.v1");
2031 }
2032 }
2033 pub mod resource {
2034 pub mod v1 {
2035 tonic::include_proto!("temporal.api.cloud.resource.v1");
2036 }
2037 }
2038 pub mod sink {
2039 pub mod v1 {
2040 tonic::include_proto!("temporal.api.cloud.sink.v1");
2041 }
2042 }
2043 pub mod usage {
2044 pub mod v1 {
2045 tonic::include_proto!("temporal.api.cloud.usage.v1");
2046 }
2047 }
2048 }
2049 pub mod common {
2050 pub mod v1 {
2051 use crate::protos::{ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL};
2052 use base64::{Engine, prelude::BASE64_STANDARD};
2053 use std::{
2054 collections::HashMap,
2055 fmt::{Display, Formatter},
2056 };
2057 include_proto_with_serde!("temporal.api.common.v1");
2058
2059 impl<T> From<T> for Payload
2060 where
2061 T: AsRef<[u8]>,
2062 {
2063 fn from(v: T) -> Self {
2064 let mut metadata = HashMap::new();
2067 metadata.insert(ENCODING_PAYLOAD_KEY.to_string(), b"binary/plain".to_vec());
2068 Self {
2069 metadata,
2070 data: v.as_ref().to_vec(),
2071 external_payloads: Default::default(),
2072 }
2073 }
2074 }
2075
2076 impl Payload {
2077 pub fn as_slice(&self) -> &[u8] {
2079 self.data.as_slice()
2080 }
2081
2082 pub fn is_json_payload(&self) -> bool {
2083 self.metadata
2084 .get(ENCODING_PAYLOAD_KEY)
2085 .map(|v| v.as_slice() == JSON_ENCODING_VAL.as_bytes())
2086 .unwrap_or_default()
2087 }
2088 }
2089
2090 impl std::fmt::Debug for Payload {
2091 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2092 if std::env::var("TEMPORAL_PRINT_FULL_PAYLOADS").is_err()
2093 && self.data.len() > 64
2094 {
2095 let mut windows = self.data.as_slice().windows(32);
2096 write!(
2097 f,
2098 "[{}..{}]",
2099 BASE64_STANDARD.encode(windows.next().unwrap_or_default()),
2100 BASE64_STANDARD.encode(windows.next_back().unwrap_or_default())
2101 )
2102 } else {
2103 write!(f, "[{}]", BASE64_STANDARD.encode(&self.data))
2104 }
2105 }
2106 }
2107
2108 impl Display for Payload {
2109 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2110 write!(f, "{:?}", self)
2111 }
2112 }
2113
2114 impl Display for Header {
2115 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2116 write!(f, "Header(")?;
2117 for kv in &self.fields {
2118 write!(f, "{}: ", kv.0)?;
2119 write!(f, "{}, ", kv.1)?;
2120 }
2121 write!(f, ")")
2122 }
2123 }
2124
2125 impl From<Header> for HashMap<String, Payload> {
2126 fn from(h: Header) -> Self {
2127 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2128 }
2129 }
2130
2131 impl From<Memo> for HashMap<String, Payload> {
2132 fn from(h: Memo) -> Self {
2133 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2134 }
2135 }
2136
2137 impl From<SearchAttributes> for HashMap<String, Payload> {
2138 fn from(h: SearchAttributes) -> Self {
2139 h.indexed_fields
2140 .into_iter()
2141 .map(|(k, v)| (k, v.into()))
2142 .collect()
2143 }
2144 }
2145
2146 impl From<HashMap<String, Payload>> for SearchAttributes {
2147 fn from(h: HashMap<String, Payload>) -> Self {
2148 Self {
2149 indexed_fields: h.into_iter().map(|(k, v)| (k, v.into())).collect(),
2150 }
2151 }
2152 }
2153
2154 impl From<String> for ActivityType {
2155 fn from(name: String) -> Self {
2156 Self { name }
2157 }
2158 }
2159
2160 impl From<&str> for ActivityType {
2161 fn from(name: &str) -> Self {
2162 Self {
2163 name: name.to_string(),
2164 }
2165 }
2166 }
2167
2168 impl From<ActivityType> for String {
2169 fn from(at: ActivityType) -> Self {
2170 at.name
2171 }
2172 }
2173
2174 impl From<&str> for WorkflowType {
2175 fn from(v: &str) -> Self {
2176 Self {
2177 name: v.to_string(),
2178 }
2179 }
2180 }
2181 }
2182 }
2183 pub mod deployment {
2184 pub mod v1 {
2185 tonic::include_proto!("temporal.api.deployment.v1");
2186 }
2187 }
2188 pub mod enums {
2189 pub mod v1 {
2190 include_proto_with_serde!("temporal.api.enums.v1");
2191 }
2192 }
2193 pub mod errordetails {
2194 pub mod v1 {
2195 tonic::include_proto!("temporal.api.errordetails.v1");
2196 }
2197 }
2198 pub mod failure {
2199 pub mod v1 {
2200 include_proto_with_serde!("temporal.api.failure.v1");
2201 }
2202 }
2203 pub mod filter {
2204 pub mod v1 {
2205 tonic::include_proto!("temporal.api.filter.v1");
2206 }
2207 }
2208 pub mod history {
2209 pub mod v1 {
2210 use crate::protos::temporal::api::{
2211 enums::v1::EventType, history::v1::history_event::Attributes,
2212 };
2213 use anyhow::bail;
2214 use std::fmt::{Display, Formatter};
2215
2216 tonic::include_proto!("temporal.api.history.v1");
2217
2218 impl History {
2219 pub fn extract_run_id_from_start(&self) -> Result<&str, anyhow::Error> {
2220 extract_original_run_id_from_events(&self.events)
2221 }
2222
2223 pub fn last_event_id(&self) -> i64 {
2226 self.events.last().map(|e| e.event_id).unwrap_or_default()
2227 }
2228 }
2229
2230 pub fn extract_original_run_id_from_events(
2231 events: &[HistoryEvent],
2232 ) -> Result<&str, anyhow::Error> {
2233 if let Some(Attributes::WorkflowExecutionStartedEventAttributes(wes)) =
2234 events.get(0).and_then(|x| x.attributes.as_ref())
2235 {
2236 Ok(&wes.original_execution_run_id)
2237 } else {
2238 bail!("First event is not WorkflowExecutionStarted?!?")
2239 }
2240 }
2241
2242 impl HistoryEvent {
2243 pub fn is_command_event(&self) -> bool {
2245 EventType::try_from(self.event_type).map_or(false, |et| match et {
2246 EventType::ActivityTaskScheduled
2247 | EventType::ActivityTaskCancelRequested
2248 | EventType::MarkerRecorded
2249 | EventType::RequestCancelExternalWorkflowExecutionInitiated
2250 | EventType::SignalExternalWorkflowExecutionInitiated
2251 | EventType::StartChildWorkflowExecutionInitiated
2252 | EventType::TimerCanceled
2253 | EventType::TimerStarted
2254 | EventType::UpsertWorkflowSearchAttributes
2255 | EventType::WorkflowPropertiesModified
2256 | EventType::NexusOperationScheduled
2257 | EventType::NexusOperationCancelRequested
2258 | EventType::WorkflowExecutionCanceled
2259 | EventType::WorkflowExecutionCompleted
2260 | EventType::WorkflowExecutionContinuedAsNew
2261 | EventType::WorkflowExecutionFailed
2262 | EventType::WorkflowExecutionUpdateAccepted
2263 | EventType::WorkflowExecutionUpdateRejected
2264 | EventType::WorkflowExecutionUpdateCompleted => true,
2265 _ => false,
2266 })
2267 }
2268
2269 pub fn get_initial_command_event_id(&self) -> Option<i64> {
2273 self.attributes.as_ref().and_then(|a| {
2274 match a {
2277 Attributes::ActivityTaskStartedEventAttributes(a) =>
2278 Some(a.scheduled_event_id),
2279 Attributes::ActivityTaskCompletedEventAttributes(a) =>
2280 Some(a.scheduled_event_id),
2281 Attributes::ActivityTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2282 Attributes::ActivityTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2283 Attributes::ActivityTaskCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2284 Attributes::ActivityTaskCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2285 Attributes::TimerFiredEventAttributes(a) => Some(a.started_event_id),
2286 Attributes::TimerCanceledEventAttributes(a) => Some(a.started_event_id),
2287 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2288 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(a) => Some(a.initiated_event_id),
2289 Attributes::StartChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2290 Attributes::ChildWorkflowExecutionStartedEventAttributes(a) => Some(a.initiated_event_id),
2291 Attributes::ChildWorkflowExecutionCompletedEventAttributes(a) => Some(a.initiated_event_id),
2292 Attributes::ChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2293 Attributes::ChildWorkflowExecutionCanceledEventAttributes(a) => Some(a.initiated_event_id),
2294 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(a) => Some(a.initiated_event_id),
2295 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(a) => Some(a.initiated_event_id),
2296 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2297 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(a) => Some(a.initiated_event_id),
2298 Attributes::WorkflowTaskStartedEventAttributes(a) => Some(a.scheduled_event_id),
2299 Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2300 Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2301 Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2302 Attributes::NexusOperationStartedEventAttributes(a) => Some(a.scheduled_event_id),
2303 Attributes::NexusOperationCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2304 Attributes::NexusOperationFailedEventAttributes(a) => Some(a.scheduled_event_id),
2305 Attributes::NexusOperationTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2306 Attributes::NexusOperationCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2307 Attributes::NexusOperationCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2308 Attributes::NexusOperationCancelRequestCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2309 Attributes::NexusOperationCancelRequestFailedEventAttributes(a) => Some(a.scheduled_event_id),
2310 _ => None
2311 }
2312 })
2313 }
2314
2315 pub fn get_protocol_instance_id(&self) -> Option<&str> {
2317 self.attributes.as_ref().and_then(|a| match a {
2318 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(a) => {
2319 Some(a.protocol_instance_id.as_str())
2320 }
2321 _ => None,
2322 })
2323 }
2324
2325 pub fn is_final_wf_execution_event(&self) -> bool {
2327 match self.event_type() {
2328 EventType::WorkflowExecutionCompleted => true,
2329 EventType::WorkflowExecutionCanceled => true,
2330 EventType::WorkflowExecutionFailed => true,
2331 EventType::WorkflowExecutionTimedOut => true,
2332 EventType::WorkflowExecutionContinuedAsNew => true,
2333 EventType::WorkflowExecutionTerminated => true,
2334 _ => false,
2335 }
2336 }
2337
2338 pub fn is_wft_closed_event(&self) -> bool {
2339 match self.event_type() {
2340 EventType::WorkflowTaskCompleted => true,
2341 EventType::WorkflowTaskFailed => true,
2342 EventType::WorkflowTaskTimedOut => true,
2343 _ => false,
2344 }
2345 }
2346
2347 pub fn is_ignorable(&self) -> bool {
2348 if !self.worker_may_ignore {
2349 return false;
2350 }
2351 if let Some(a) = self.attributes.as_ref() {
2354 match a {
2355 Attributes::WorkflowExecutionStartedEventAttributes(_) => false,
2356 Attributes::WorkflowExecutionCompletedEventAttributes(_) => false,
2357 Attributes::WorkflowExecutionFailedEventAttributes(_) => false,
2358 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => false,
2359 Attributes::WorkflowTaskScheduledEventAttributes(_) => false,
2360 Attributes::WorkflowTaskStartedEventAttributes(_) => false,
2361 Attributes::WorkflowTaskCompletedEventAttributes(_) => false,
2362 Attributes::WorkflowTaskTimedOutEventAttributes(_) => false,
2363 Attributes::WorkflowTaskFailedEventAttributes(_) => false,
2364 Attributes::ActivityTaskScheduledEventAttributes(_) => false,
2365 Attributes::ActivityTaskStartedEventAttributes(_) => false,
2366 Attributes::ActivityTaskCompletedEventAttributes(_) => false,
2367 Attributes::ActivityTaskFailedEventAttributes(_) => false,
2368 Attributes::ActivityTaskTimedOutEventAttributes(_) => false,
2369 Attributes::TimerStartedEventAttributes(_) => false,
2370 Attributes::TimerFiredEventAttributes(_) => false,
2371 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => false,
2372 Attributes::ActivityTaskCanceledEventAttributes(_) => false,
2373 Attributes::TimerCanceledEventAttributes(_) => false,
2374 Attributes::MarkerRecordedEventAttributes(_) => false,
2375 Attributes::WorkflowExecutionSignaledEventAttributes(_) => false,
2376 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => false,
2377 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => false,
2378 Attributes::WorkflowExecutionCanceledEventAttributes(_) => false,
2379 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2380 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => false,
2381 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => false,
2382 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => false,
2383 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => false,
2384 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => false,
2385 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => false,
2386 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => false,
2387 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => false,
2388 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => false,
2389 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => false,
2390 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => false,
2391 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2392 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => false,
2393 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => false,
2394 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => false,
2395 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => false,
2396 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => false,
2397 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => false,
2398 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => false,
2399 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => false,
2400 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => false,
2401 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => false,
2402 Attributes::NexusOperationScheduledEventAttributes(_) => false,
2403 Attributes::NexusOperationStartedEventAttributes(_) => false,
2404 Attributes::NexusOperationCompletedEventAttributes(_) => false,
2405 Attributes::NexusOperationFailedEventAttributes(_) => false,
2406 Attributes::NexusOperationCanceledEventAttributes(_) => false,
2407 Attributes::NexusOperationTimedOutEventAttributes(_) => false,
2408 Attributes::NexusOperationCancelRequestedEventAttributes(_) => false,
2409 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => true,
2411 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => false,
2412 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => false,
2413 Attributes::WorkflowExecutionPausedEventAttributes(_) => true,
2415 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => true,
2417 }
2418 } else {
2419 false
2420 }
2421 }
2422 }
2423
2424 impl Display for HistoryEvent {
2425 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2426 write!(
2427 f,
2428 "HistoryEvent(id: {}, {:?})",
2429 self.event_id,
2430 EventType::try_from(self.event_type).unwrap_or_default()
2431 )
2432 }
2433 }
2434
2435 impl Attributes {
2436 pub fn event_type(&self) -> EventType {
2437 match self {
2439 Attributes::WorkflowExecutionStartedEventAttributes(_) => { EventType::WorkflowExecutionStarted }
2440 Attributes::WorkflowExecutionCompletedEventAttributes(_) => { EventType::WorkflowExecutionCompleted }
2441 Attributes::WorkflowExecutionFailedEventAttributes(_) => { EventType::WorkflowExecutionFailed }
2442 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => { EventType::WorkflowExecutionTimedOut }
2443 Attributes::WorkflowTaskScheduledEventAttributes(_) => { EventType::WorkflowTaskScheduled }
2444 Attributes::WorkflowTaskStartedEventAttributes(_) => { EventType::WorkflowTaskStarted }
2445 Attributes::WorkflowTaskCompletedEventAttributes(_) => { EventType::WorkflowTaskCompleted }
2446 Attributes::WorkflowTaskTimedOutEventAttributes(_) => { EventType::WorkflowTaskTimedOut }
2447 Attributes::WorkflowTaskFailedEventAttributes(_) => { EventType::WorkflowTaskFailed }
2448 Attributes::ActivityTaskScheduledEventAttributes(_) => { EventType::ActivityTaskScheduled }
2449 Attributes::ActivityTaskStartedEventAttributes(_) => { EventType::ActivityTaskStarted }
2450 Attributes::ActivityTaskCompletedEventAttributes(_) => { EventType::ActivityTaskCompleted }
2451 Attributes::ActivityTaskFailedEventAttributes(_) => { EventType::ActivityTaskFailed }
2452 Attributes::ActivityTaskTimedOutEventAttributes(_) => { EventType::ActivityTaskTimedOut }
2453 Attributes::TimerStartedEventAttributes(_) => { EventType::TimerStarted }
2454 Attributes::TimerFiredEventAttributes(_) => { EventType::TimerFired }
2455 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => { EventType::ActivityTaskCancelRequested }
2456 Attributes::ActivityTaskCanceledEventAttributes(_) => { EventType::ActivityTaskCanceled }
2457 Attributes::TimerCanceledEventAttributes(_) => { EventType::TimerCanceled }
2458 Attributes::MarkerRecordedEventAttributes(_) => { EventType::MarkerRecorded }
2459 Attributes::WorkflowExecutionSignaledEventAttributes(_) => { EventType::WorkflowExecutionSignaled }
2460 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => { EventType::WorkflowExecutionTerminated }
2461 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::WorkflowExecutionCancelRequested }
2462 Attributes::WorkflowExecutionCanceledEventAttributes(_) => { EventType::WorkflowExecutionCanceled }
2463 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionInitiated }
2464 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionFailed }
2465 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::ExternalWorkflowExecutionCancelRequested }
2466 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => { EventType::WorkflowExecutionContinuedAsNew }
2467 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => { EventType::StartChildWorkflowExecutionInitiated }
2468 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => { EventType::StartChildWorkflowExecutionFailed }
2469 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => { EventType::ChildWorkflowExecutionStarted }
2470 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => { EventType::ChildWorkflowExecutionCompleted }
2471 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => { EventType::ChildWorkflowExecutionFailed }
2472 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => { EventType::ChildWorkflowExecutionCanceled }
2473 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => { EventType::ChildWorkflowExecutionTimedOut }
2474 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => { EventType::ChildWorkflowExecutionTerminated }
2475 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionInitiated }
2476 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionFailed }
2477 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => { EventType::ExternalWorkflowExecutionSignaled }
2478 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => { EventType::UpsertWorkflowSearchAttributes }
2479 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAdmitted }
2480 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => { EventType::WorkflowExecutionUpdateRejected }
2481 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAccepted }
2482 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => { EventType::WorkflowExecutionUpdateCompleted }
2483 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => { EventType::WorkflowPropertiesModifiedExternally }
2484 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => { EventType::ActivityPropertiesModifiedExternally }
2485 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => { EventType::WorkflowPropertiesModified }
2486 Attributes::NexusOperationScheduledEventAttributes(_) => { EventType::NexusOperationScheduled }
2487 Attributes::NexusOperationStartedEventAttributes(_) => { EventType::NexusOperationStarted }
2488 Attributes::NexusOperationCompletedEventAttributes(_) => { EventType::NexusOperationCompleted }
2489 Attributes::NexusOperationFailedEventAttributes(_) => { EventType::NexusOperationFailed }
2490 Attributes::NexusOperationCanceledEventAttributes(_) => { EventType::NexusOperationCanceled }
2491 Attributes::NexusOperationTimedOutEventAttributes(_) => { EventType::NexusOperationTimedOut }
2492 Attributes::NexusOperationCancelRequestedEventAttributes(_) => { EventType::NexusOperationCancelRequested }
2493 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => { EventType::WorkflowExecutionOptionsUpdated }
2494 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => { EventType::NexusOperationCancelRequestCompleted }
2495 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => { EventType::NexusOperationCancelRequestFailed }
2496 Attributes::WorkflowExecutionPausedEventAttributes(_) => { EventType::WorkflowExecutionPaused }
2497 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => { EventType::WorkflowExecutionUnpaused }
2498 }
2499 }
2500 }
2501 }
2502 }
2503 pub mod namespace {
2504 pub mod v1 {
2505 tonic::include_proto!("temporal.api.namespace.v1");
2506 }
2507 }
2508 pub mod operatorservice {
2509 pub mod v1 {
2510 tonic::include_proto!("temporal.api.operatorservice.v1");
2511 }
2512 }
2513 pub mod protocol {
2514 pub mod v1 {
2515 use std::fmt::{Display, Formatter};
2516 tonic::include_proto!("temporal.api.protocol.v1");
2517
2518 impl Display for Message {
2519 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2520 write!(f, "ProtocolMessage({})", self.id)
2521 }
2522 }
2523 }
2524 }
2525 pub mod query {
2526 pub mod v1 {
2527 tonic::include_proto!("temporal.api.query.v1");
2528 }
2529 }
2530 pub mod replication {
2531 pub mod v1 {
2532 tonic::include_proto!("temporal.api.replication.v1");
2533 }
2534 }
2535 pub mod rules {
2536 pub mod v1 {
2537 tonic::include_proto!("temporal.api.rules.v1");
2538 }
2539 }
2540 pub mod schedule {
2541 #[allow(rustdoc::invalid_html_tags)]
2542 pub mod v1 {
2543 tonic::include_proto!("temporal.api.schedule.v1");
2544 }
2545 }
2546 pub mod sdk {
2547 pub mod v1 {
2548 tonic::include_proto!("temporal.api.sdk.v1");
2549 }
2550 }
2551 pub mod taskqueue {
2552 pub mod v1 {
2553 use crate::protos::temporal::api::enums::v1::TaskQueueKind;
2554 tonic::include_proto!("temporal.api.taskqueue.v1");
2555
2556 impl From<String> for TaskQueue {
2557 fn from(name: String) -> Self {
2558 Self {
2559 name,
2560 kind: TaskQueueKind::Normal as i32,
2561 normal_name: "".to_string(),
2562 }
2563 }
2564 }
2565 }
2566 }
2567 pub mod testservice {
2568 pub mod v1 {
2569 tonic::include_proto!("temporal.api.testservice.v1");
2570 }
2571 }
2572 pub mod update {
2573 pub mod v1 {
2574 use crate::protos::temporal::api::update::v1::outcome::Value;
2575 tonic::include_proto!("temporal.api.update.v1");
2576
2577 impl Outcome {
2578 pub fn is_success(&self) -> bool {
2579 match self.value {
2580 Some(Value::Success(_)) => true,
2581 _ => false,
2582 }
2583 }
2584 }
2585 }
2586 }
2587 pub mod version {
2588 pub mod v1 {
2589 tonic::include_proto!("temporal.api.version.v1");
2590 }
2591 }
2592 pub mod worker {
2593 pub mod v1 {
2594 tonic::include_proto!("temporal.api.worker.v1");
2595 }
2596 }
2597 pub mod workflow {
2598 pub mod v1 {
2599 tonic::include_proto!("temporal.api.workflow.v1");
2600 }
2601 }
2602 pub mod nexus {
2603 pub mod v1 {
2604 use crate::protos::{
2605 camel_case_to_screaming_snake,
2606 temporal::api::{
2607 common::{
2608 self,
2609 v1::link::{WorkflowEvent, workflow_event},
2610 },
2611 enums::v1::EventType,
2612 failure,
2613 },
2614 };
2615 use anyhow::{anyhow, bail};
2616 use prost::Name;
2617 use std::{
2618 collections::HashMap,
2619 fmt::{Display, Formatter},
2620 };
2621 use tonic::transport::Uri;
2622
2623 tonic::include_proto!("temporal.api.nexus.v1");
2624
2625 impl Display for Response {
2626 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2627 write!(f, "NexusResponse(",)?;
2628 match &self.variant {
2629 None => {}
2630 Some(v) => {
2631 write!(f, "{v}")?;
2632 }
2633 }
2634 write!(f, ")")
2635 }
2636 }
2637
2638 impl Display for response::Variant {
2639 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2640 match self {
2641 response::Variant::StartOperation(_) => {
2642 write!(f, "StartOperation")
2643 }
2644 response::Variant::CancelOperation(_) => {
2645 write!(f, "CancelOperation")
2646 }
2647 }
2648 }
2649 }
2650
2651 impl Display for HandlerError {
2652 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2653 write!(f, "HandlerError")
2654 }
2655 }
2656
2657 pub enum NexusTaskFailure {
2658 Legacy(HandlerError),
2659 Temporal(failure::v1::Failure),
2660 }
2661
2662 static SCHEME_PREFIX: &str = "temporal://";
2663
2664 pub fn workflow_event_link_from_nexus(
2666 l: &Link,
2667 ) -> Result<common::v1::Link, anyhow::Error> {
2668 if !l.url.starts_with(SCHEME_PREFIX) {
2669 bail!("Invalid scheme for nexus link: {:?}", l.url);
2670 }
2671 let no_authority_url = l.url.strip_prefix(SCHEME_PREFIX).unwrap();
2674 let uri = Uri::try_from(no_authority_url)?;
2675 let parts = uri.into_parts();
2676 let path = parts.path_and_query.ok_or_else(|| {
2677 anyhow!("Failed to parse nexus link, invalid path: {:?}", l)
2678 })?;
2679 let path_parts = path.path().split('/').collect::<Vec<_>>();
2680 if path_parts.get(1) != Some(&"namespaces") {
2681 bail!("Invalid path for nexus link: {:?}", l);
2682 }
2683 let namespace = path_parts.get(2).ok_or_else(|| {
2684 anyhow!("Failed to parse nexus link, no namespace: {:?}", l)
2685 })?;
2686 if path_parts.get(3) != Some(&"workflows") {
2687 bail!("Invalid path for nexus link, no workflows segment: {:?}", l);
2688 }
2689 let workflow_id = path_parts.get(4).ok_or_else(|| {
2690 anyhow!("Failed to parse nexus link, no workflow id: {:?}", l)
2691 })?;
2692 let run_id = path_parts
2693 .get(5)
2694 .ok_or_else(|| anyhow!("Failed to parse nexus link, no run id: {:?}", l))?;
2695 if path_parts.get(6) != Some(&"history") {
2696 bail!("Invalid path for nexus link, no history segment: {:?}", l);
2697 }
2698 let reference = if let Some(query) = path.query() {
2699 let mut eventref = workflow_event::EventReference::default();
2700 let query_parts = query.split('&').collect::<Vec<_>>();
2701 for qp in query_parts {
2702 let mut kv = qp.split('=');
2703 let key = kv.next().ok_or_else(|| {
2704 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2705 })?;
2706 let val = kv.next().ok_or_else(|| {
2707 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2708 })?;
2709 match key {
2710 "eventID" => {
2711 eventref.event_id = val.parse().map_err(|_| {
2712 anyhow!("Failed to parse nexus link event id: {:?}", l)
2713 })?;
2714 }
2715 "eventType" => {
2716 eventref.event_type = EventType::from_str_name(val)
2717 .unwrap_or_else(|| {
2718 EventType::from_str_name(
2719 &("EVENT_TYPE_".to_string()
2720 + &camel_case_to_screaming_snake(val)),
2721 )
2722 .unwrap_or_default()
2723 })
2724 .into()
2725 }
2726 _ => continue,
2727 }
2728 }
2729 Some(workflow_event::Reference::EventRef(eventref))
2730 } else {
2731 None
2732 };
2733
2734 Ok(common::v1::Link {
2735 variant: Some(common::v1::link::Variant::WorkflowEvent(WorkflowEvent {
2736 namespace: namespace.to_string(),
2737 workflow_id: workflow_id.to_string(),
2738 run_id: run_id.to_string(),
2739 reference,
2740 })),
2741 })
2742 }
2743
2744 impl TryFrom<failure::v1::Failure> for Failure {
2745 type Error = serde_json::Error;
2746
2747 fn try_from(mut f: failure::v1::Failure) -> Result<Self, Self::Error> {
2748 let message = std::mem::take(&mut f.message);
2750
2751 let details = serde_json::to_vec(&f)?;
2753
2754 Ok(Failure {
2756 message,
2757 stack_trace: f.stack_trace,
2758 metadata: HashMap::from([(
2759 "type".to_string(),
2760 failure::v1::Failure::full_name().into(),
2761 )]),
2762 details,
2763 cause: None,
2764 })
2765 }
2766 }
2767 }
2768 }
2769 pub mod workflowservice {
2770 pub mod v1 {
2771 use std::{
2772 convert::TryInto,
2773 fmt::{Display, Formatter},
2774 time::{Duration, SystemTime},
2775 };
2776
2777 tonic::include_proto!("temporal.api.workflowservice.v1");
2778
2779 macro_rules! sched_to_start_impl {
2780 ($sched_field:ident) => {
2781 pub fn sched_to_start(&self) -> Option<Duration> {
2784 if let Some((sch, st)) =
2785 self.$sched_field.clone().zip(self.started_time.clone())
2786 {
2787 if let Some(value) = elapsed_between_prost_times(sch, st) {
2788 return value;
2789 }
2790 }
2791 None
2792 }
2793 };
2794 }
2795
2796 fn elapsed_between_prost_times(
2797 from: prost_types::Timestamp,
2798 to: prost_types::Timestamp,
2799 ) -> Option<Option<Duration>> {
2800 let from: Result<SystemTime, _> = from.try_into();
2801 let to: Result<SystemTime, _> = to.try_into();
2802 if let (Ok(from), Ok(to)) = (from, to) {
2803 return Some(to.duration_since(from).ok());
2804 }
2805 None
2806 }
2807
2808 impl PollWorkflowTaskQueueResponse {
2809 sched_to_start_impl!(scheduled_time);
2810 }
2811
2812 impl Display for PollWorkflowTaskQueueResponse {
2813 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2814 let last_event = self
2815 .history
2816 .as_ref()
2817 .and_then(|h| h.events.last().map(|he| he.event_id))
2818 .unwrap_or(0);
2819 write!(
2820 f,
2821 "PollWFTQResp(run_id: {}, attempt: {}, last_event: {})",
2822 self.workflow_execution
2823 .as_ref()
2824 .map_or("", |we| we.run_id.as_str()),
2825 self.attempt,
2826 last_event
2827 )
2828 }
2829 }
2830
2831 pub struct CompactHist<'a>(pub &'a PollWorkflowTaskQueueResponse);
2833 impl Display for CompactHist<'_> {
2834 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2835 writeln!(
2836 f,
2837 "PollWorkflowTaskQueueResponse (prev_started: {}, started: {})",
2838 self.0.previous_started_event_id, self.0.started_event_id
2839 )?;
2840 if let Some(h) = self.0.history.as_ref() {
2841 for event in &h.events {
2842 writeln!(f, "{}", event)?;
2843 }
2844 }
2845 writeln!(f, "query: {:#?}", self.0.query)?;
2846 writeln!(f, "queries: {:#?}", self.0.queries)
2847 }
2848 }
2849
2850 impl PollActivityTaskQueueResponse {
2851 sched_to_start_impl!(current_attempt_scheduled_time);
2852 }
2853
2854 impl PollNexusTaskQueueResponse {
2855 pub fn sched_to_start(&self) -> Option<Duration> {
2856 if let Some((sch, st)) = self
2857 .request
2858 .as_ref()
2859 .and_then(|r| r.scheduled_time)
2860 .clone()
2861 .zip(SystemTime::now().try_into().ok())
2862 {
2863 if let Some(value) = elapsed_between_prost_times(sch, st) {
2864 return value;
2865 }
2866 }
2867 None
2868 }
2869 }
2870
2871 impl QueryWorkflowResponse {
2872 pub fn unwrap(self) -> Vec<crate::protos::temporal::api::common::v1::Payload> {
2874 self.query_result.unwrap().payloads
2875 }
2876 }
2877 }
2878 }
2879 }
2880}
2881
2882#[allow(
2883 clippy::all,
2884 missing_docs,
2885 rustdoc::broken_intra_doc_links,
2886 rustdoc::bare_urls
2887)]
2888pub mod google {
2889 pub mod rpc {
2890 tonic::include_proto!("google.rpc");
2891 }
2892}
2893
2894#[allow(
2895 clippy::all,
2896 missing_docs,
2897 rustdoc::broken_intra_doc_links,
2898 rustdoc::bare_urls
2899)]
2900pub mod grpc {
2901 pub mod health {
2902 pub mod v1 {
2903 tonic::include_proto!("grpc.health.v1");
2904 }
2905 }
2906}
2907
2908pub fn camel_case_to_screaming_snake(val: &str) -> String {
2910 let mut out = String::new();
2911 let mut last_was_upper = true;
2912 for c in val.chars() {
2913 if c.is_uppercase() {
2914 if !last_was_upper {
2915 out.push('_');
2916 }
2917 out.push(c.to_ascii_uppercase());
2918 last_was_upper = true;
2919 } else {
2920 out.push(c.to_ascii_uppercase());
2921 last_was_upper = false;
2922 }
2923 }
2924 out
2925}
2926
2927pub fn proto_ts_to_system_time(ts: &prost_types::Timestamp) -> Option<std::time::SystemTime> {
2929 std::time::SystemTime::UNIX_EPOCH
2930 .checked_add(Duration::from_secs(ts.seconds as u64) + Duration::from_nanos(ts.nanos as u64))
2931}
2932
2933#[cfg(test)]
2934mod tests {
2935 use crate::protos::{
2936 coresdk::{activity_task, activity_task::ActivityTask},
2937 temporal::api::{failure::v1::Failure, workflowservice::v1::PollActivityTaskQueueResponse},
2938 };
2939 use anyhow::anyhow;
2940
2941 #[test]
2942 fn start_from_poll_resp_standalone_activity_populates_run_id() {
2943 let resp = PollActivityTaskQueueResponse {
2944 task_token: vec![1, 2, 3],
2945 activity_run_id: "test-run-id-123".to_string(),
2946 activity_id: "my-activity".to_string(),
2947 ..Default::default()
2948 };
2949 let task = ActivityTask::start_from_poll_resp(resp);
2950 let start = match task.variant {
2951 Some(activity_task::activity_task::Variant::Start(s)) => s,
2952 _ => panic!("expected Start variant"),
2953 };
2954 assert_eq!(start.run_id, "test-run-id-123");
2955 assert!(!start.is_local);
2956 }
2957
2958 #[test]
2959 fn start_from_poll_resp_workflow_activity_has_empty_run_id() {
2960 use crate::protos::temporal::api::common::v1::WorkflowExecution;
2961 let resp = PollActivityTaskQueueResponse {
2962 task_token: vec![4, 5, 6],
2963 activity_id: "my-workflow-activity".to_string(),
2964 workflow_execution: Some(WorkflowExecution {
2965 workflow_id: "wf-123".to_string(),
2966 run_id: "wf-run-456".to_string(),
2967 }),
2968 ..Default::default()
2970 };
2971 let task = ActivityTask::start_from_poll_resp(resp);
2972 let start = match task.variant {
2973 Some(activity_task::activity_task::Variant::Start(s)) => s,
2974 _ => panic!("expected Start variant"),
2975 };
2976 assert!(start.run_id.is_empty());
2977 assert_eq!(start.workflow_execution.unwrap().run_id, "wf-run-456");
2979 }
2980
2981 #[test]
2982 fn anyhow_to_failure_conversion() {
2983 let no_causes: Failure = anyhow!("no causes").into();
2984 assert_eq!(no_causes.cause, None);
2985 assert_eq!(no_causes.message, "no causes");
2986 let orig = anyhow!("fail 1");
2987 let mid = orig.context("fail 2");
2988 let top = mid.context("fail 3");
2989 let as_fail: Failure = top.into();
2990 assert_eq!(as_fail.message, "fail 3");
2991 assert_eq!(as_fail.cause.as_ref().unwrap().message, "fail 2");
2992 assert_eq!(as_fail.cause.unwrap().cause.unwrap().message, "fail 1");
2993 }
2994}