1pub mod constants;
6mod task_token;
7pub mod utilities;
9pub use task_token::TaskToken;
10
11pub static ENCODING_PAYLOAD_KEY: &str = "encoding";
13pub static JSON_ENCODING_VAL: &str = "json/plain";
15pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data";
17pub static VERSION_SEARCH_ATTR_KEY: &str = "TemporalChangeVersion";
19
20macro_rules! include_proto_with_serde {
21 ($pkg:tt) => {
22 tonic::include_proto!($pkg);
23
24 #[cfg(feature = "serde_serialize")]
25 include!(concat!(env!("OUT_DIR"), concat!("/", $pkg, ".serde.rs")));
26 };
27}
28
29#[allow(
30 clippy::large_enum_variant,
31 clippy::derive_partial_eq_without_eq,
32 clippy::reserve_after_initialization
33)]
34#[allow(missing_docs)]
36pub mod coresdk {
37 tonic::include_proto!("coresdk");
39 pub use self::sdk_helpers::*;
40 mod sdk_helpers {
41 use super::*;
42
43 use crate::protos::{
44 ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL,
45 temporal::api::{
46 common::v1::{Payload, Payloads, RetryPolicy, WorkflowExecution},
47 enums::v1::{
48 ApplicationErrorCategory, TimeoutType, VersioningBehavior,
49 WorkflowTaskFailedCause,
50 },
51 failure::v1::{
52 ActivityFailureInfo, ApplicationFailureInfo, Failure, TimeoutFailureInfo,
53 failure::FailureInfo,
54 },
55 workflowservice::v1::PollActivityTaskQueueResponse,
56 },
57 };
58 use activity_task::ActivityTask;
59 use serde::{Deserialize, Serialize};
60 use std::{
61 collections::HashMap,
62 convert::TryFrom,
63 fmt::{Display, Formatter},
64 iter::FromIterator,
65 };
66 use workflow_activation::{WorkflowActivationJob, workflow_activation_job};
67 use workflow_commands::{WorkflowCommand, workflow_command, workflow_command::Variant};
68 use workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion};
69
70 pub type HistoryEventId = i64;
71
72 impl From<workflow_activation_job::Variant> for WorkflowActivationJob {
73 fn from(a: workflow_activation_job::Variant) -> Self {
74 Self { variant: Some(a) }
75 }
76 }
77
78 impl From<Vec<WorkflowCommand>> for workflow_completion::Success {
79 fn from(v: Vec<WorkflowCommand>) -> Self {
80 Self {
81 commands: v,
82 used_internal_flags: vec![],
83 versioning_behavior: VersioningBehavior::Unspecified.into(),
84 }
85 }
86 }
87
88 impl From<workflow_command::Variant> for WorkflowCommand {
89 fn from(v: workflow_command::Variant) -> Self {
90 Self {
91 variant: Some(v),
92 user_metadata: None,
93 }
94 }
95 }
96
97 impl workflow_completion::Success {
98 pub fn from_variants(cmds: Vec<Variant>) -> Self {
99 let cmds: Vec<_> = cmds.into_iter().map(|c| c.into()).collect();
100 cmds.into()
101 }
102 }
103
104 impl WorkflowActivationCompletion {
105 pub fn empty(run_id: impl Into<String>) -> Self {
107 let success = workflow_completion::Success::from_variants(vec![]);
108 Self {
109 run_id: run_id.into(),
110 status: Some(workflow_activation_completion::Status::Successful(success)),
111 }
112 }
113
114 pub fn from_cmds(
116 run_id: impl Into<String>,
117 cmds: Vec<workflow_command::Variant>,
118 ) -> Self {
119 let success = workflow_completion::Success::from_variants(cmds);
120 Self {
121 run_id: run_id.into(),
122 status: Some(workflow_activation_completion::Status::Successful(success)),
123 }
124 }
125
126 pub fn from_cmd(run_id: impl Into<String>, cmd: workflow_command::Variant) -> Self {
128 let success = workflow_completion::Success::from_variants(vec![cmd]);
129 Self {
130 run_id: run_id.into(),
131 status: Some(workflow_activation_completion::Status::Successful(success)),
132 }
133 }
134
135 pub fn fail(
136 run_id: impl Into<String>,
137 failure: Failure,
138 cause: Option<WorkflowTaskFailedCause>,
139 ) -> Self {
140 Self {
141 run_id: run_id.into(),
142 status: Some(workflow_activation_completion::Status::Failed(
143 workflow_completion::Failure {
144 failure: Some(failure),
145 force_cause: cause.unwrap_or(WorkflowTaskFailedCause::Unspecified)
146 as i32,
147 },
148 )),
149 }
150 }
151
152 pub fn has_execution_ending(&self) -> bool {
155 self.has_complete_workflow_execution()
156 || self.has_fail_execution()
157 || self.has_continue_as_new()
158 || self.has_cancel_workflow_execution()
159 }
160
161 pub fn has_fail_execution(&self) -> bool {
163 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
164 return s.commands.iter().any(|wfc| {
165 matches!(
166 wfc,
167 WorkflowCommand {
168 variant: Some(workflow_command::Variant::FailWorkflowExecution(_)),
169 ..
170 }
171 )
172 });
173 }
174 false
175 }
176
177 pub fn has_cancel_workflow_execution(&self) -> bool {
179 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
180 return s.commands.iter().any(|wfc| {
181 matches!(
182 wfc,
183 WorkflowCommand {
184 variant: Some(workflow_command::Variant::CancelWorkflowExecution(
185 _
186 )),
187 ..
188 }
189 )
190 });
191 }
192 false
193 }
194
195 pub fn has_continue_as_new(&self) -> bool {
197 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
198 return s.commands.iter().any(|wfc| {
199 matches!(
200 wfc,
201 WorkflowCommand {
202 variant: Some(
203 workflow_command::Variant::ContinueAsNewWorkflowExecution(_)
204 ),
205 ..
206 }
207 )
208 });
209 }
210 false
211 }
212
213 pub fn has_complete_workflow_execution(&self) -> bool {
215 self.complete_workflow_execution_value().is_some()
216 }
217
218 pub fn complete_workflow_execution_value(&self) -> Option<&Payload> {
220 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
221 s.commands.iter().find_map(|wfc| match wfc {
222 WorkflowCommand {
223 variant: Some(workflow_command::Variant::CompleteWorkflowExecution(v)),
224 ..
225 } => v.result.as_ref(),
226 _ => None,
227 })
228 } else {
229 None
230 }
231 }
232
233 pub fn is_empty(&self) -> bool {
235 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
236 return s.commands.is_empty();
237 }
238 false
239 }
240
241 pub fn add_internal_flags(&mut self, patch: u32) {
242 if let Some(workflow_activation_completion::Status::Successful(s)) =
243 &mut self.status
244 {
245 s.used_internal_flags.push(patch);
246 }
247 }
248 }
249
250 pub trait IntoCompletion {
252 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion;
254 }
255
256 impl IntoCompletion for workflow_command::Variant {
257 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
258 WorkflowActivationCompletion::from_cmd(run_id, self)
259 }
260 }
261
262 impl<I, V> IntoCompletion for I
263 where
264 I: IntoIterator<Item = V>,
265 V: Into<WorkflowCommand>,
266 {
267 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
268 let success = self.into_iter().map(Into::into).collect::<Vec<_>>().into();
269 WorkflowActivationCompletion {
270 run_id,
271 status: Some(workflow_activation_completion::Status::Successful(success)),
272 }
273 }
274 }
275
276 impl Display for WorkflowActivationCompletion {
277 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
278 write!(
279 f,
280 "WorkflowActivationCompletion(run_id: {}, status: ",
281 &self.run_id
282 )?;
283 match &self.status {
284 None => write!(f, "empty")?,
285 Some(s) => write!(f, "{s}")?,
286 };
287 write!(f, ")")
288 }
289 }
290
291 impl Display for workflow_activation_completion::Status {
292 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
293 match self {
294 workflow_activation_completion::Status::Successful(
295 workflow_completion::Success { commands, .. },
296 ) => {
297 write!(f, "Success(")?;
298 let mut written = 0;
299 for c in commands {
300 write!(f, "{c} ")?;
301 written += 1;
302 if written >= 10 && written < commands.len() {
303 write!(f, "... {} more", commands.len() - written)?;
304 break;
305 }
306 }
307 write!(f, ")")
308 }
309 workflow_activation_completion::Status::Failed(_) => {
310 write!(f, "Failed")
311 }
312 }
313 }
314 }
315
316 impl ActivityTask {
317 pub fn start_from_poll_resp(r: PollActivityTaskQueueResponse) -> Self {
318 let (workflow_id, run_id) = r
319 .workflow_execution
320 .map(|we| (we.workflow_id, we.run_id))
321 .unwrap_or_default();
322 Self {
323 task_token: r.task_token,
324 variant: Some(activity_task::activity_task::Variant::Start(
325 activity_task::Start {
326 workflow_namespace: r.workflow_namespace,
327 workflow_type: r
328 .workflow_type
329 .map_or_else(|| "".to_string(), |wt| wt.name),
330 workflow_execution: Some(WorkflowExecution {
331 workflow_id,
332 run_id,
333 }),
334 activity_id: r.activity_id,
335 activity_type: r
336 .activity_type
337 .map_or_else(|| "".to_string(), |at| at.name),
338 header_fields: r.header.map(Into::into).unwrap_or_default(),
339 input: Vec::from_payloads(r.input),
340 heartbeat_details: Vec::from_payloads(r.heartbeat_details),
341 scheduled_time: r.scheduled_time,
342 current_attempt_scheduled_time: r.current_attempt_scheduled_time,
343 started_time: r.started_time,
344 attempt: r.attempt as u32,
345 schedule_to_close_timeout: r.schedule_to_close_timeout,
346 start_to_close_timeout: r.start_to_close_timeout,
347 heartbeat_timeout: r.heartbeat_timeout,
348 retry_policy: r.retry_policy.map(fix_retry_policy),
349 priority: r.priority,
350 is_local: false,
351 run_id: r.activity_run_id,
352 },
353 )),
354 }
355 }
356 }
357
358 impl Failure {
359 pub fn is_timeout(
360 &self,
361 ) -> Option<crate::protos::temporal::api::enums::v1::TimeoutType> {
362 match &self.failure_info {
363 Some(FailureInfo::TimeoutFailureInfo(ti)) => Some(ti.timeout_type()),
364 _ => {
365 if let Some(c) = &self.cause {
366 c.is_timeout()
367 } else {
368 None
369 }
370 }
371 }
372 }
373
374 pub fn application_failure(message: String, non_retryable: bool) -> Self {
375 Self {
376 message,
377 failure_info: Some(FailureInfo::ApplicationFailureInfo(
378 ApplicationFailureInfo {
379 non_retryable,
380 ..Default::default()
381 },
382 )),
383 ..Default::default()
384 }
385 }
386
387 pub fn application_failure_from_error(ae: anyhow::Error, non_retryable: bool) -> Self {
388 Self {
389 failure_info: Some(FailureInfo::ApplicationFailureInfo(
390 ApplicationFailureInfo {
391 non_retryable,
392 ..Default::default()
393 },
394 )),
395 ..ae.chain()
396 .rfold(None, |cause, e| {
397 Some(Self {
398 message: e.to_string(),
399 cause: cause.map(Box::new),
400 ..Default::default()
401 })
402 })
403 .unwrap_or_default()
404 }
405 }
406
407 pub fn timeout(timeout_type: TimeoutType) -> Self {
408 Self {
409 message: "Activity timed out".to_string(),
410 cause: Some(Box::new(Failure {
411 message: "Activity timed out".to_string(),
412 failure_info: Some(FailureInfo::TimeoutFailureInfo(TimeoutFailureInfo {
413 timeout_type: timeout_type.into(),
414 ..Default::default()
415 })),
416 ..Default::default()
417 })),
418 failure_info: Some(FailureInfo::ActivityFailureInfo(
419 ActivityFailureInfo::default(),
420 )),
421 ..Default::default()
422 }
423 }
424
425 pub fn maybe_application_failure(&self) -> Option<&ApplicationFailureInfo> {
427 if let Failure {
428 failure_info: Some(FailureInfo::ApplicationFailureInfo(f)),
429 ..
430 } = self
431 {
432 Some(f)
433 } else {
434 None
435 }
436 }
437
438 pub fn is_benign_application_failure(&self) -> bool {
440 self.maybe_application_failure()
441 .is_some_and(|app_info| app_info.category() == ApplicationErrorCategory::Benign)
442 }
443 }
444
445 impl Display for Failure {
446 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
447 write!(f, "Failure({}, ", self.message)?;
448 match self.failure_info.as_ref() {
449 None => write!(f, "missing info")?,
450 Some(FailureInfo::TimeoutFailureInfo(v)) => {
451 write!(f, "Timeout: {:?}", v.timeout_type())?;
452 }
453 Some(FailureInfo::ApplicationFailureInfo(v)) => {
454 write!(f, "Application Failure: {}", v.r#type)?;
455 }
456 Some(FailureInfo::CanceledFailureInfo(_)) => {
457 write!(f, "Cancelled")?;
458 }
459 Some(FailureInfo::TerminatedFailureInfo(_)) => {
460 write!(f, "Terminated")?;
461 }
462 Some(FailureInfo::ServerFailureInfo(_)) => {
463 write!(f, "Server Failure")?;
464 }
465 Some(FailureInfo::ResetWorkflowFailureInfo(_)) => {
466 write!(f, "Reset Workflow")?;
467 }
468 Some(FailureInfo::ActivityFailureInfo(v)) => {
469 write!(
470 f,
471 "Activity Failure: scheduled_event_id: {}",
472 v.scheduled_event_id
473 )?;
474 }
475 Some(FailureInfo::ChildWorkflowExecutionFailureInfo(v)) => {
476 write!(
477 f,
478 "Child Workflow: started_event_id: {}",
479 v.started_event_id
480 )?;
481 }
482 Some(FailureInfo::NexusOperationExecutionFailureInfo(v)) => {
483 write!(
484 f,
485 "Nexus Operation Failure: scheduled_event_id: {}",
486 v.scheduled_event_id
487 )?;
488 }
489 Some(FailureInfo::NexusHandlerFailureInfo(v)) => {
490 write!(f, "Nexus Handler Failure: {}", v.r#type)?;
491 }
492 }
493 write!(f, ")")
494 }
495 }
496
497 impl From<&str> for Failure {
498 fn from(v: &str) -> Self {
499 Failure::application_failure(v.to_string(), false)
500 }
501 }
502
503 impl From<String> for Failure {
504 fn from(v: String) -> Self {
505 Failure::application_failure(v, false)
506 }
507 }
508
509 impl From<anyhow::Error> for Failure {
510 fn from(ae: anyhow::Error) -> Self {
511 Failure::application_failure_from_error(ae, false)
512 }
513 }
514
515 pub trait FromPayloadsExt {
516 fn from_payloads(p: Option<Payloads>) -> Self;
517 }
518 impl<T> FromPayloadsExt for T
519 where
520 T: FromIterator<Payload>,
521 {
522 fn from_payloads(p: Option<Payloads>) -> Self {
523 match p {
524 None => std::iter::empty().collect(),
525 Some(p) => p.payloads.into_iter().collect(),
526 }
527 }
528 }
529
530 pub trait IntoPayloadsExt {
531 fn into_payloads(self) -> Option<Payloads>;
532 }
533 impl<T> IntoPayloadsExt for T
534 where
535 T: IntoIterator<Item = Payload>,
536 {
537 fn into_payloads(self) -> Option<Payloads> {
538 let mut iterd = self.into_iter().peekable();
539 if iterd.peek().is_none() {
540 None
541 } else {
542 Some(Payloads {
543 payloads: iterd.collect(),
544 })
545 }
546 }
547 }
548
549 impl From<Payload> for Payloads {
550 fn from(p: Payload) -> Self {
551 Self { payloads: vec![p] }
552 }
553 }
554
555 impl<T> From<T> for Payloads
556 where
557 T: AsRef<[u8]>,
558 {
559 fn from(v: T) -> Self {
560 Self {
561 payloads: vec![v.into()],
562 }
563 }
564 }
565
566 #[derive(thiserror::Error, Debug)]
567 pub enum PayloadDeserializeErr {
568 #[error("This deserializer does not understand this payload")]
571 DeserializerDoesNotHandle,
572 #[error("Error during deserialization: {0}")]
573 DeserializeErr(#[from] anyhow::Error),
574 }
575
576 pub trait AsJsonPayloadExt {
579 fn as_json_payload(&self) -> anyhow::Result<Payload>;
580 }
581 impl<T> AsJsonPayloadExt for T
582 where
583 T: Serialize,
584 {
585 fn as_json_payload(&self) -> anyhow::Result<Payload> {
586 let as_json = serde_json::to_string(self)?;
587 let mut metadata = HashMap::new();
588 metadata.insert(
589 ENCODING_PAYLOAD_KEY.to_string(),
590 JSON_ENCODING_VAL.as_bytes().to_vec(),
591 );
592 Ok(Payload {
593 metadata,
594 data: as_json.into_bytes(),
595 external_payloads: Default::default(),
596 })
597 }
598 }
599
600 pub trait FromJsonPayloadExt: Sized {
601 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr>;
602 }
603 impl<T> FromJsonPayloadExt for T
604 where
605 T: for<'de> Deserialize<'de>,
606 {
607 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr> {
608 if !payload.is_json_payload() {
609 return Err(PayloadDeserializeErr::DeserializerDoesNotHandle);
610 }
611 let payload_str =
612 std::str::from_utf8(&payload.data).map_err(anyhow::Error::from)?;
613 Ok(serde_json::from_str(payload_str).map_err(anyhow::Error::from)?)
614 }
615 }
616
617 #[derive(derive_more::Display, Debug)]
619 pub enum PayloadsToPayloadError {
620 MoreThanOnePayload,
621 NoPayload,
622 }
623 impl TryFrom<Payloads> for Payload {
624 type Error = PayloadsToPayloadError;
625
626 fn try_from(mut v: Payloads) -> Result<Self, Self::Error> {
627 match v.payloads.pop() {
628 None => Err(PayloadsToPayloadError::NoPayload),
629 Some(p) => {
630 if v.payloads.is_empty() {
631 Ok(p)
632 } else {
633 Err(PayloadsToPayloadError::MoreThanOnePayload)
634 }
635 }
636 }
637 }
638 }
639
640 pub(super) fn fix_retry_policy(mut retry_policy: RetryPolicy) -> RetryPolicy {
643 if retry_policy.initial_interval.is_none() {
644 retry_policy.initial_interval = Default::default();
645 }
646 retry_policy
647 }
648 }
649 #[allow(clippy::module_inception)]
650 pub mod activity_task {
651 tonic::include_proto!("coresdk.activity_task");
652 mod sdk_helpers {
653 use super::*;
654 use crate::protos::{coresdk::ActivityTaskCompletion, task_token::format_task_token};
655 use std::fmt::{Display, Formatter};
656
657 impl ActivityTask {
658 pub fn cancel_from_ids(
659 task_token: Vec<u8>,
660 reason: ActivityCancelReason,
661 details: ActivityCancellationDetails,
662 ) -> Self {
663 Self {
664 task_token,
665 variant: Some(activity_task::Variant::Cancel(Cancel {
666 reason: reason as i32,
667 details: Some(details),
668 })),
669 }
670 }
671
672 pub fn is_timeout(&self) -> bool {
674 match &self.variant {
675 Some(activity_task::Variant::Cancel(Cancel { reason, details })) => {
676 *reason == ActivityCancelReason::TimedOut as i32
677 || details.as_ref().is_some_and(|d| d.is_timed_out)
678 }
679 _ => false,
680 }
681 }
682
683 pub fn primary_reason_to_cancellation_details(
684 reason: ActivityCancelReason,
685 ) -> ActivityCancellationDetails {
686 ActivityCancellationDetails {
687 is_not_found: reason == ActivityCancelReason::NotFound,
688 is_cancelled: reason == ActivityCancelReason::Cancelled,
689 is_paused: reason == ActivityCancelReason::Paused,
690 is_timed_out: reason == ActivityCancelReason::TimedOut,
691 is_worker_shutdown: reason == ActivityCancelReason::WorkerShutdown,
692 is_reset: reason == ActivityCancelReason::Reset,
693 }
694 }
695 }
696
697 impl Display for ActivityTaskCompletion {
698 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
699 write!(
700 f,
701 "ActivityTaskCompletion(token: {}",
702 format_task_token(&self.task_token),
703 )?;
704 if let Some(r) = self.result.as_ref().and_then(|r| r.status.as_ref()) {
705 write!(f, ", {r}")?;
706 } else {
707 write!(f, ", missing result")?;
708 }
709 write!(f, ")")
710 }
711 }
712 }
713 }
714 #[allow(clippy::module_inception)]
715 pub mod activity_result {
716 tonic::include_proto!("coresdk.activity_result");
717 mod sdk_helpers {
718 use super::*;
719 use crate::protos::{
720 coresdk::activity_result::activity_resolution::Status,
721 temporal::api::{
722 common::v1::Payload,
723 enums::v1::TimeoutType,
724 failure::v1::{CanceledFailureInfo, Failure as APIFailure, failure},
725 },
726 };
727 use activity_execution_result as aer;
728 use anyhow::anyhow;
729 use std::fmt::{Display, Formatter};
730
731 impl ActivityExecutionResult {
732 pub const fn ok(result: Payload) -> Self {
733 Self {
734 status: Some(aer::Status::Completed(Success {
735 result: Some(result),
736 })),
737 }
738 }
739
740 pub fn fail(fail: APIFailure) -> Self {
741 Self {
742 status: Some(aer::Status::Failed(Failure {
743 failure: Some(fail),
744 })),
745 }
746 }
747
748 pub fn cancel(fail: APIFailure) -> Self {
749 Self {
750 status: Some(aer::Status::Cancelled(Cancellation {
751 failure: Some(fail),
752 })),
753 }
754 }
755
756 pub fn cancel_from_details(payload: Option<Payload>) -> Self {
757 Self {
758 status: Some(aer::Status::Cancelled(Cancellation::from_details(payload))),
759 }
760 }
761
762 pub const fn will_complete_async() -> Self {
763 Self {
764 status: Some(aer::Status::WillCompleteAsync(WillCompleteAsync {})),
765 }
766 }
767
768 pub fn is_cancelled(&self) -> bool {
769 matches!(self.status, Some(aer::Status::Cancelled(_)))
770 }
771 }
772
773 impl Display for aer::Status {
774 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
775 write!(f, "ActivityExecutionResult(")?;
776 match self {
777 aer::Status::Completed(v) => {
778 write!(f, "{v})")
779 }
780 aer::Status::Failed(v) => {
781 write!(f, "{v})")
782 }
783 aer::Status::Cancelled(v) => {
784 write!(f, "{v})")
785 }
786 aer::Status::WillCompleteAsync(_) => {
787 write!(f, "Will complete async)")
788 }
789 }
790 }
791 }
792
793 impl Display for Success {
794 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
795 write!(f, "Success(")?;
796 if let Some(ref v) = self.result {
797 write!(f, "{v}")?;
798 }
799 write!(f, ")")
800 }
801 }
802
803 impl Display for Failure {
804 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
805 write!(f, "Failure(")?;
806 if let Some(ref v) = self.failure {
807 write!(f, "{v}")?;
808 }
809 write!(f, ")")
810 }
811 }
812
813 impl Display for Cancellation {
814 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
815 write!(f, "Cancellation(")?;
816 if let Some(ref v) = self.failure {
817 write!(f, "{v}")?;
818 }
819 write!(f, ")")
820 }
821 }
822
823 impl From<Result<Payload, APIFailure>> for ActivityExecutionResult {
824 fn from(r: Result<Payload, APIFailure>) -> Self {
825 Self {
826 status: match r {
827 Ok(p) => Some(aer::Status::Completed(Success { result: Some(p) })),
828 Err(f) => Some(aer::Status::Failed(Failure { failure: Some(f) })),
829 },
830 }
831 }
832 }
833
834 impl ActivityResolution {
835 pub fn success_payload_or_error(self) -> Result<Option<Payload>, anyhow::Error> {
838 let Some(status) = self.status else {
839 return Err(anyhow!("Activity completed without a status"));
840 };
841
842 match status {
843 activity_resolution::Status::Completed(success) => Ok(success.result),
844 e => Err(anyhow!("Activity was not successful: {e:?}")),
845 }
846 }
847
848 pub fn unwrap_ok_payload(self) -> Payload {
849 self.success_payload_or_error().unwrap().unwrap()
850 }
851
852 pub fn completed_ok(&self) -> bool {
853 matches!(self.status, Some(activity_resolution::Status::Completed(_)))
854 }
855
856 pub fn failed(&self) -> bool {
857 matches!(self.status, Some(activity_resolution::Status::Failed(_)))
858 }
859
860 pub fn timed_out(&self) -> Option<TimeoutType> {
861 match self.status {
862 Some(activity_resolution::Status::Failed(Failure {
863 failure: Some(ref f),
864 })) => f.is_timeout(),
865 _ => None,
866 }
867 }
868
869 pub fn cancelled(&self) -> bool {
870 matches!(self.status, Some(activity_resolution::Status::Cancelled(_)))
871 }
872
873 pub fn unwrap_failure(self) -> APIFailure {
876 match self.status.unwrap() {
877 Status::Failed(f) => f.failure.unwrap(),
878 Status::Cancelled(c) => c.failure.unwrap(),
879 _ => panic!("Actvity did not fail"),
880 }
881 }
882 }
883
884 impl Cancellation {
885 pub fn from_details(details: Option<Payload>) -> Self {
888 Cancellation {
889 failure: Some(APIFailure {
890 message: "Activity cancelled".to_string(),
891 failure_info: Some(failure::FailureInfo::CanceledFailureInfo(
892 CanceledFailureInfo {
893 details: details.map(Into::into),
894 identity: Default::default(),
895 },
896 )),
897 ..Default::default()
898 }),
899 }
900 }
901 }
902 }
903 }
904 pub mod common {
905 tonic::include_proto!("coresdk.common");
906 pub use self::sdk_helpers::*;
907 mod sdk_helpers {
908 use crate::protos::{
909 PATCHED_MARKER_DETAILS_KEY,
910 coresdk::{
911 AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt,
912 external_data::{LocalActivityMarkerData, PatchedMarkerData},
913 },
914 temporal::api::common::v1::{Payload, Payloads},
915 };
916 use std::collections::HashMap;
917
918 pub fn build_has_change_marker_details(
919 patch_id: impl Into<String>,
920 deprecated: bool,
921 ) -> anyhow::Result<HashMap<String, Payloads>> {
922 let mut hm = HashMap::new();
923 let encoded = PatchedMarkerData {
924 id: patch_id.into(),
925 deprecated,
926 }
927 .as_json_payload()?;
928 hm.insert(PATCHED_MARKER_DETAILS_KEY.to_string(), encoded.into());
929 Ok(hm)
930 }
931
932 pub fn decode_change_marker_details(
933 details: &HashMap<String, Payloads>,
934 ) -> Option<(String, bool)> {
935 if let Some(cd) = details.get(PATCHED_MARKER_DETAILS_KEY) {
938 let decoded =
939 PatchedMarkerData::from_json_payload(cd.payloads.first()?).ok()?;
940 return Some((decoded.id, decoded.deprecated));
941 }
942
943 let id_entry = details.get("patch_id")?.payloads.first()?;
944 let deprecated_entry = details.get("deprecated")?.payloads.first()?;
945 let name = std::str::from_utf8(&id_entry.data).ok()?;
946 let deprecated = *deprecated_entry.data.first()? != 0;
947 Some((name.to_string(), deprecated))
948 }
949
950 pub fn build_local_activity_marker_details(
951 metadata: LocalActivityMarkerData,
952 result: Option<Payload>,
953 ) -> HashMap<String, Payloads> {
954 let mut hm = HashMap::new();
955 if let Some(jsonified) = metadata.as_json_payload().into_payloads() {
958 hm.insert("data".to_string(), jsonified);
959 }
960 if let Some(res) = result {
961 hm.insert("result".to_string(), res.into());
962 }
963 hm
964 }
965
966 pub fn extract_local_activity_marker_data(
969 details: &HashMap<String, Payloads>,
970 ) -> Option<LocalActivityMarkerData> {
971 details
972 .get("data")
973 .and_then(|p| p.payloads.first())
974 .and_then(|p| std::str::from_utf8(&p.data).ok())
975 .and_then(|s| serde_json::from_str(s).ok())
976 }
977
978 pub fn extract_local_activity_marker_details(
982 details: &mut HashMap<String, Payloads>,
983 ) -> (Option<LocalActivityMarkerData>, Option<Payload>) {
984 let data = extract_local_activity_marker_data(details);
985 let result = details.remove("result").and_then(|mut p| p.payloads.pop());
986 (data, result)
987 }
988 }
989 }
990 pub mod external_data {
991 tonic::include_proto!("coresdk.external_data");
992 mod sdk_helpers {
993 use prost_types::{Duration, Timestamp};
994 use serde::{Deserialize, Deserializer, Serialize, Serializer};
995
996 #[derive(Serialize, Deserialize)]
1000 #[serde(remote = "Timestamp")]
1001 struct TimestampDef {
1002 seconds: i64,
1003 nanos: i32,
1004 }
1005 pub(crate) mod opt_timestamp {
1006 use super::*;
1007
1008 pub(crate) fn serialize<S>(
1009 value: &Option<Timestamp>,
1010 serializer: S,
1011 ) -> Result<S::Ok, S::Error>
1012 where
1013 S: Serializer,
1014 {
1015 #[derive(Serialize)]
1016 struct Helper<'a>(#[serde(with = "TimestampDef")] &'a Timestamp);
1017
1018 value.as_ref().map(Helper).serialize(serializer)
1019 }
1020
1021 pub(crate) fn deserialize<'de, D>(
1022 deserializer: D,
1023 ) -> Result<Option<Timestamp>, D::Error>
1024 where
1025 D: Deserializer<'de>,
1026 {
1027 #[derive(Deserialize)]
1028 struct Helper(#[serde(with = "TimestampDef")] Timestamp);
1029
1030 let helper = Option::deserialize(deserializer)?;
1031 Ok(helper.map(|Helper(external)| external))
1032 }
1033 }
1034
1035 #[derive(Serialize, Deserialize)]
1037 #[serde(remote = "Duration")]
1038 struct DurationDef {
1039 seconds: i64,
1040 nanos: i32,
1041 }
1042 pub(crate) mod opt_duration {
1043 use super::*;
1044
1045 pub(crate) fn serialize<S>(
1046 value: &Option<Duration>,
1047 serializer: S,
1048 ) -> Result<S::Ok, S::Error>
1049 where
1050 S: Serializer,
1051 {
1052 #[derive(Serialize)]
1053 struct Helper<'a>(#[serde(with = "DurationDef")] &'a Duration);
1054
1055 value.as_ref().map(Helper).serialize(serializer)
1056 }
1057
1058 pub(crate) fn deserialize<'de, D>(
1059 deserializer: D,
1060 ) -> Result<Option<Duration>, D::Error>
1061 where
1062 D: Deserializer<'de>,
1063 {
1064 #[derive(Deserialize)]
1065 struct Helper(#[serde(with = "DurationDef")] Duration);
1066
1067 let helper = Option::deserialize(deserializer)?;
1068 Ok(helper.map(|Helper(external)| external))
1069 }
1070 }
1071 }
1072 }
1073 pub mod workflow_activation {
1074 tonic::include_proto!("coresdk.workflow_activation");
1075 pub use self::sdk_helpers::*;
1076 mod sdk_helpers {
1077 use super::*;
1078 use crate::protos::{
1079 coresdk::{
1080 FromPayloadsExt,
1081 activity_result::{ActivityResolution, activity_resolution},
1082 common::NamespacedWorkflowExecution,
1083 fix_retry_policy,
1084 workflow_activation::remove_from_cache::EvictionReason,
1085 },
1086 temporal::api::{
1087 enums::v1::WorkflowTaskFailedCause,
1088 history::v1::{
1089 WorkflowExecutionCancelRequestedEventAttributes,
1090 WorkflowExecutionSignaledEventAttributes,
1091 WorkflowExecutionStartedEventAttributes,
1092 },
1093 query::v1::WorkflowQuery,
1094 },
1095 };
1096 use prost_types::Timestamp;
1097 use std::fmt::{Display, Formatter};
1098
1099 pub fn create_evict_activation(
1100 run_id: String,
1101 message: String,
1102 reason: EvictionReason,
1103 ) -> WorkflowActivation {
1104 WorkflowActivation {
1105 timestamp: None,
1106 run_id,
1107 is_replaying: false,
1108 history_length: 0,
1109 jobs: vec![WorkflowActivationJob::from(
1110 workflow_activation_job::Variant::RemoveFromCache(RemoveFromCache {
1111 message,
1112 reason: reason as i32,
1113 }),
1114 )],
1115 available_internal_flags: vec![],
1116 history_size_bytes: 0,
1117 continue_as_new_suggested: false,
1118 deployment_version_for_current_task: None,
1119 last_sdk_version: String::new(),
1120 suggest_continue_as_new_reasons: vec![],
1121 target_worker_deployment_version_changed: false,
1122 }
1123 }
1124
1125 pub fn query_to_job(id: String, q: WorkflowQuery) -> QueryWorkflow {
1126 QueryWorkflow {
1127 query_id: id,
1128 query_type: q.query_type,
1129 arguments: Vec::from_payloads(q.query_args),
1130 headers: q.header.map(|h| h.into()).unwrap_or_default(),
1131 }
1132 }
1133
1134 impl WorkflowActivation {
1135 pub fn is_only_eviction(&self) -> bool {
1137 matches!(
1138 self.jobs.as_slice(),
1139 [WorkflowActivationJob {
1140 variant: Some(workflow_activation_job::Variant::RemoveFromCache(_))
1141 }]
1142 )
1143 }
1144
1145 pub fn eviction_reason(&self) -> Option<EvictionReason> {
1147 self.jobs.iter().find_map(|j| {
1148 if let Some(workflow_activation_job::Variant::RemoveFromCache(ref rj)) =
1149 j.variant
1150 {
1151 EvictionReason::try_from(rj.reason).ok()
1152 } else {
1153 None
1154 }
1155 })
1156 }
1157 }
1158
1159 impl workflow_activation_job::Variant {
1160 pub fn is_local_activity_resolution(&self) -> bool {
1161 matches!(self, workflow_activation_job::Variant::ResolveActivity(ra) if ra.is_local)
1162 }
1163 }
1164
1165 impl Display for EvictionReason {
1166 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1167 write!(f, "{self:?}")
1168 }
1169 }
1170
1171 impl From<EvictionReason> for WorkflowTaskFailedCause {
1172 fn from(value: EvictionReason) -> Self {
1173 match value {
1174 EvictionReason::Nondeterminism => {
1175 WorkflowTaskFailedCause::NonDeterministicError
1176 }
1177 _ => WorkflowTaskFailedCause::Unspecified,
1178 }
1179 }
1180 }
1181
1182 impl Display for WorkflowActivation {
1183 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1184 write!(f, "WorkflowActivation(")?;
1185 write!(f, "run_id: {}, ", self.run_id)?;
1186 write!(f, "is_replaying: {}, ", self.is_replaying)?;
1187 write!(
1188 f,
1189 "jobs: {})",
1190 self.jobs
1191 .iter()
1192 .map(ToString::to_string)
1193 .collect::<Vec<_>>()
1194 .as_slice()
1195 .join(", ")
1196 )
1197 }
1198 }
1199
1200 impl Display for WorkflowActivationJob {
1201 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1202 match &self.variant {
1203 None => write!(f, "empty"),
1204 Some(v) => write!(f, "{v}"),
1205 }
1206 }
1207 }
1208
1209 impl Display for workflow_activation_job::Variant {
1210 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1211 match self {
1212 workflow_activation_job::Variant::InitializeWorkflow(_) => {
1213 write!(f, "InitializeWorkflow")
1214 }
1215 workflow_activation_job::Variant::FireTimer(t) => {
1216 write!(f, "FireTimer({})", t.seq)
1217 }
1218 workflow_activation_job::Variant::UpdateRandomSeed(_) => {
1219 write!(f, "UpdateRandomSeed")
1220 }
1221 workflow_activation_job::Variant::QueryWorkflow(_) => {
1222 write!(f, "QueryWorkflow")
1223 }
1224 workflow_activation_job::Variant::CancelWorkflow(_) => {
1225 write!(f, "CancelWorkflow")
1226 }
1227 workflow_activation_job::Variant::SignalWorkflow(_) => {
1228 write!(f, "SignalWorkflow")
1229 }
1230 workflow_activation_job::Variant::ResolveActivity(r) => {
1231 write!(
1232 f,
1233 "ResolveActivity({}, {})",
1234 r.seq,
1235 r.result
1236 .as_ref()
1237 .unwrap_or(&ActivityResolution { status: None })
1238 )
1239 }
1240 workflow_activation_job::Variant::NotifyHasPatch(_) => {
1241 write!(f, "NotifyHasPatch")
1242 }
1243 workflow_activation_job::Variant::ResolveChildWorkflowExecutionStart(_) => {
1244 write!(f, "ResolveChildWorkflowExecutionStart")
1245 }
1246 workflow_activation_job::Variant::ResolveChildWorkflowExecution(_) => {
1247 write!(f, "ResolveChildWorkflowExecution")
1248 }
1249 workflow_activation_job::Variant::ResolveSignalExternalWorkflow(_) => {
1250 write!(f, "ResolveSignalExternalWorkflow")
1251 }
1252 workflow_activation_job::Variant::RemoveFromCache(_) => {
1253 write!(f, "RemoveFromCache")
1254 }
1255 workflow_activation_job::Variant::ResolveRequestCancelExternalWorkflow(
1256 _,
1257 ) => {
1258 write!(f, "ResolveRequestCancelExternalWorkflow")
1259 }
1260 workflow_activation_job::Variant::DoUpdate(u) => {
1261 write!(f, "DoUpdate({})", u.id)
1262 }
1263 workflow_activation_job::Variant::ResolveNexusOperationStart(_) => {
1264 write!(f, "ResolveNexusOperationStart")
1265 }
1266 workflow_activation_job::Variant::ResolveNexusOperation(_) => {
1267 write!(f, "ResolveNexusOperation")
1268 }
1269 }
1270 }
1271 }
1272
1273 impl Display for ActivityResolution {
1274 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1275 match self.status {
1276 None => {
1277 write!(f, "None")
1278 }
1279 Some(activity_resolution::Status::Failed(_)) => {
1280 write!(f, "Failed")
1281 }
1282 Some(activity_resolution::Status::Completed(_)) => {
1283 write!(f, "Completed")
1284 }
1285 Some(activity_resolution::Status::Cancelled(_)) => {
1286 write!(f, "Cancelled")
1287 }
1288 Some(activity_resolution::Status::Backoff(_)) => {
1289 write!(f, "Backoff")
1290 }
1291 }
1292 }
1293 }
1294
1295 impl Display for QueryWorkflow {
1296 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1297 write!(
1298 f,
1299 "QueryWorkflow(id: {}, type: {})",
1300 self.query_id, self.query_type
1301 )
1302 }
1303 }
1304
1305 impl From<WorkflowExecutionSignaledEventAttributes> for SignalWorkflow {
1306 fn from(a: WorkflowExecutionSignaledEventAttributes) -> Self {
1307 Self {
1308 signal_name: a.signal_name,
1309 input: Vec::from_payloads(a.input),
1310 identity: a.identity,
1311 headers: a.header.map(Into::into).unwrap_or_default(),
1312 }
1313 }
1314 }
1315
1316 impl From<WorkflowExecutionCancelRequestedEventAttributes> for CancelWorkflow {
1317 fn from(a: WorkflowExecutionCancelRequestedEventAttributes) -> Self {
1318 Self { reason: a.cause }
1319 }
1320 }
1321
1322 pub fn start_workflow_from_attribs(
1324 attrs: WorkflowExecutionStartedEventAttributes,
1325 workflow_id: String,
1326 randomness_seed: u64,
1327 start_time: Timestamp,
1328 ) -> InitializeWorkflow {
1329 InitializeWorkflow {
1330 workflow_type: attrs.workflow_type.map(|wt| wt.name).unwrap_or_default(),
1331 workflow_id,
1332 arguments: Vec::from_payloads(attrs.input),
1333 randomness_seed,
1334 headers: attrs.header.unwrap_or_default().fields,
1335 identity: attrs.identity,
1336 parent_workflow_info: attrs.parent_workflow_execution.map(|pe| {
1337 NamespacedWorkflowExecution {
1338 namespace: attrs.parent_workflow_namespace,
1339 run_id: pe.run_id,
1340 workflow_id: pe.workflow_id,
1341 }
1342 }),
1343 workflow_execution_timeout: attrs.workflow_execution_timeout,
1344 workflow_run_timeout: attrs.workflow_run_timeout,
1345 workflow_task_timeout: attrs.workflow_task_timeout,
1346 continued_from_execution_run_id: attrs.continued_execution_run_id,
1347 continued_initiator: attrs.initiator,
1348 continued_failure: attrs.continued_failure,
1349 last_completion_result: attrs.last_completion_result,
1350 first_execution_run_id: attrs.first_execution_run_id,
1351 retry_policy: attrs.retry_policy.map(fix_retry_policy),
1352 attempt: attrs.attempt,
1353 cron_schedule: attrs.cron_schedule,
1354 workflow_execution_expiration_time: attrs.workflow_execution_expiration_time,
1355 cron_schedule_to_schedule_interval: attrs.first_workflow_task_backoff,
1356 memo: attrs.memo,
1357 search_attributes: attrs.search_attributes,
1358 start_time: Some(start_time),
1359 root_workflow: attrs.root_workflow_execution,
1360 priority: attrs.priority,
1361 }
1362 }
1363 }
1364 }
1365 pub mod workflow_completion {
1366 tonic::include_proto!("coresdk.workflow_completion");
1367 mod sdk_helpers {
1368 use super::*;
1369 use crate::protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure};
1370
1371 impl workflow_activation_completion::Status {
1372 pub const fn is_success(&self) -> bool {
1373 match &self {
1374 Self::Successful(_) => true,
1375 Self::Failed(_) => false,
1376 }
1377 }
1378 }
1379
1380 impl From<failure::v1::Failure> for Failure {
1381 fn from(f: failure::v1::Failure) -> Self {
1382 Failure {
1383 failure: Some(f),
1384 force_cause: WorkflowTaskFailedCause::Unspecified as i32,
1385 }
1386 }
1387 }
1388 }
1389 }
1390 pub mod child_workflow {
1391 tonic::include_proto!("coresdk.child_workflow");
1392 }
1393 pub mod nexus {
1394 tonic::include_proto!("coresdk.nexus");
1395 pub use self::sdk_helpers::*;
1396 mod sdk_helpers {
1397 use super::*;
1398 use crate::protos::temporal::api::workflowservice::v1::PollNexusTaskQueueResponse;
1399 use std::fmt::{Display, Formatter};
1400
1401 impl NexusTask {
1402 pub fn unwrap_task(self) -> PollNexusTaskQueueResponse {
1404 if let Some(nexus_task::Variant::Task(t)) = self.variant {
1405 return t;
1406 }
1407 panic!("Nexus task did not contain a server task");
1408 }
1409
1410 pub fn task_token(&self) -> &[u8] {
1412 match &self.variant {
1413 Some(nexus_task::Variant::Task(t)) => t.task_token.as_slice(),
1414 Some(nexus_task::Variant::CancelTask(c)) => c.task_token.as_slice(),
1415 None => panic!("Nexus task did not contain a task token"),
1416 }
1417 }
1418 }
1419
1420 impl Display for nexus_task_completion::Status {
1421 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1422 write!(f, "NexusTaskCompletion(")?;
1423 match self {
1424 nexus_task_completion::Status::Completed(c) => {
1425 write!(f, "{c}")
1426 }
1427 nexus_task_completion::Status::AckCancel(_) => {
1428 write!(f, "AckCancel")
1429 }
1430 #[allow(deprecated)]
1431 nexus_task_completion::Status::Error(error) => {
1432 write!(f, "Error({error:?})")
1433 }
1434 nexus_task_completion::Status::Failure(failure) => {
1435 write!(f, "{failure}")
1436 }
1437 }?;
1438 write!(f, ")")
1439 }
1440 }
1441
1442 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1443 pub enum NexusOperationErrorState {
1444 Failed,
1445 Canceled,
1446 }
1447
1448 impl Display for NexusOperationErrorState {
1449 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1450 match self {
1451 Self::Failed => write!(f, "failed"),
1452 Self::Canceled => write!(f, "canceled"),
1453 }
1454 }
1455 }
1456 }
1457 }
1458 pub mod workflow_commands {
1459 tonic::include_proto!("coresdk.workflow_commands");
1460 mod sdk_helpers {
1461 use super::*;
1462 use crate::protos::temporal::api::{common::v1::Payloads, enums::v1::QueryResultType};
1463 use std::fmt::{Display, Formatter};
1464
1465 impl Display for WorkflowCommand {
1466 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1467 match &self.variant {
1468 None => write!(f, "Empty"),
1469 Some(v) => write!(f, "{v}"),
1470 }
1471 }
1472 }
1473
1474 impl Display for StartTimer {
1475 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1476 write!(f, "StartTimer({})", self.seq)
1477 }
1478 }
1479
1480 impl Display for ScheduleActivity {
1481 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1482 write!(f, "ScheduleActivity({}, {})", self.seq, self.activity_type)
1483 }
1484 }
1485
1486 impl Display for ScheduleLocalActivity {
1487 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1488 write!(
1489 f,
1490 "ScheduleLocalActivity({}, {})",
1491 self.seq, self.activity_type
1492 )
1493 }
1494 }
1495
1496 impl Display for QueryResult {
1497 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1498 write!(f, "RespondToQuery({})", self.query_id)
1499 }
1500 }
1501
1502 impl Display for RequestCancelActivity {
1503 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1504 write!(f, "RequestCancelActivity({})", self.seq)
1505 }
1506 }
1507
1508 impl Display for RequestCancelLocalActivity {
1509 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1510 write!(f, "RequestCancelLocalActivity({})", self.seq)
1511 }
1512 }
1513
1514 impl Display for CancelTimer {
1515 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1516 write!(f, "CancelTimer({})", self.seq)
1517 }
1518 }
1519
1520 impl Display for CompleteWorkflowExecution {
1521 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1522 write!(f, "CompleteWorkflowExecution")
1523 }
1524 }
1525
1526 impl Display for FailWorkflowExecution {
1527 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1528 write!(f, "FailWorkflowExecution")
1529 }
1530 }
1531
1532 impl Display for ContinueAsNewWorkflowExecution {
1533 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1534 write!(f, "ContinueAsNewWorkflowExecution")
1535 }
1536 }
1537
1538 impl Display for CancelWorkflowExecution {
1539 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1540 write!(f, "CancelWorkflowExecution")
1541 }
1542 }
1543
1544 impl Display for SetPatchMarker {
1545 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1546 write!(f, "SetPatchMarker({})", self.patch_id)
1547 }
1548 }
1549
1550 impl Display for StartChildWorkflowExecution {
1551 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1552 write!(
1553 f,
1554 "StartChildWorkflowExecution({}, {})",
1555 self.seq, self.workflow_type
1556 )
1557 }
1558 }
1559
1560 impl Display for RequestCancelExternalWorkflowExecution {
1561 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1562 write!(f, "RequestCancelExternalWorkflowExecution({})", self.seq)
1563 }
1564 }
1565
1566 impl Display for UpsertWorkflowSearchAttributes {
1567 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1568 let keys: Vec<_> = self
1569 .search_attributes
1570 .as_ref()
1571 .map(|sa| sa.indexed_fields.keys().collect())
1572 .unwrap_or_default();
1573 write!(f, "UpsertWorkflowSearchAttributes({:?})", keys)
1574 }
1575 }
1576
1577 impl Display for SignalExternalWorkflowExecution {
1578 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1579 write!(f, "SignalExternalWorkflowExecution({})", self.seq)
1580 }
1581 }
1582
1583 impl Display for CancelSignalWorkflow {
1584 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1585 write!(f, "CancelSignalWorkflow({})", self.seq)
1586 }
1587 }
1588
1589 impl Display for CancelChildWorkflowExecution {
1590 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1591 write!(
1592 f,
1593 "CancelChildWorkflowExecution({})",
1594 self.child_workflow_seq
1595 )
1596 }
1597 }
1598
1599 impl Display for ModifyWorkflowProperties {
1600 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1601 write!(
1602 f,
1603 "ModifyWorkflowProperties(upserted memo keys: {:?})",
1604 self.upserted_memo.as_ref().map(|m| m.fields.keys())
1605 )
1606 }
1607 }
1608
1609 impl Display for UpdateResponse {
1610 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1611 write!(
1612 f,
1613 "UpdateResponse(protocol_instance_id: {}, response: {:?})",
1614 self.protocol_instance_id, self.response
1615 )
1616 }
1617 }
1618
1619 impl Display for ScheduleNexusOperation {
1620 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1621 write!(f, "ScheduleNexusOperation({})", self.seq)
1622 }
1623 }
1624
1625 impl Display for RequestCancelNexusOperation {
1626 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1627 write!(f, "RequestCancelNexusOperation({})", self.seq)
1628 }
1629 }
1630
1631 impl QueryResult {
1632 pub fn into_components(
1634 self,
1635 ) -> (String, QueryResultType, Option<Payloads>, String) {
1636 match self {
1637 QueryResult {
1638 variant: Some(query_result::Variant::Succeeded(qs)),
1639 query_id,
1640 } => (
1641 query_id,
1642 QueryResultType::Answered,
1643 qs.response.map(Into::into),
1644 "".to_string(),
1645 ),
1646 QueryResult {
1647 variant: Some(query_result::Variant::Failed(err)),
1648 query_id,
1649 } => (query_id, QueryResultType::Failed, None, err.message),
1650 QueryResult {
1651 variant: None,
1652 query_id,
1653 } => (
1654 query_id,
1655 QueryResultType::Failed,
1656 None,
1657 "Query response was empty".to_string(),
1658 ),
1659 }
1660 }
1661 }
1662 }
1663 }
1664}
1665
1666#[allow(
1668 clippy::all,
1669 missing_docs,
1670 rustdoc::broken_intra_doc_links,
1671 rustdoc::bare_urls
1672)]
1673pub mod temporal {
1675 pub mod api {
1676 pub mod activity {
1677 pub mod v1 {
1678 tonic::include_proto!("temporal.api.activity.v1");
1679 }
1680 }
1681 pub mod batch {
1682 pub mod v1 {
1683 tonic::include_proto!("temporal.api.batch.v1");
1684 }
1685 }
1686 pub mod callback {
1687 pub mod v1 {
1688 tonic::include_proto!("temporal.api.callback.v1");
1689 }
1690 }
1691 pub mod command {
1692 pub mod v1 {
1693 tonic::include_proto!("temporal.api.command.v1");
1694 pub use self::sdk_helpers::*;
1695 mod sdk_helpers {
1696 use super::*;
1697 use crate::protos::{
1698 coresdk::{IntoPayloadsExt, workflow_commands},
1699 temporal::api::{
1700 common::v1::{ActivityType, WorkflowType},
1701 enums::v1::CommandType,
1702 },
1703 };
1704 use command::Attributes;
1705 use std::fmt::{Display, Formatter};
1706
1707 impl From<command::Attributes> for Command {
1708 fn from(c: command::Attributes) -> Self {
1709 match c {
1710 a @ Attributes::StartTimerCommandAttributes(_) => Self {
1711 command_type: CommandType::StartTimer as i32,
1712 attributes: Some(a),
1713 user_metadata: Default::default(),
1714 },
1715 a @ Attributes::CancelTimerCommandAttributes(_) => Self {
1716 command_type: CommandType::CancelTimer as i32,
1717 attributes: Some(a),
1718 user_metadata: Default::default(),
1719 },
1720 a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => {
1721 Self {
1722 command_type: CommandType::CompleteWorkflowExecution as i32,
1723 attributes: Some(a),
1724 user_metadata: Default::default(),
1725 }
1726 }
1727 a @ Attributes::FailWorkflowExecutionCommandAttributes(_) => Self {
1728 command_type: CommandType::FailWorkflowExecution as i32,
1729 attributes: Some(a),
1730 user_metadata: Default::default(),
1731 },
1732 a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self {
1733 command_type: CommandType::ScheduleActivityTask as i32,
1734 attributes: Some(a),
1735 user_metadata: Default::default(),
1736 },
1737 a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => {
1738 Self {
1739 command_type: CommandType::RequestCancelActivityTask as i32,
1740 attributes: Some(a),
1741 user_metadata: Default::default(),
1742 }
1743 }
1744 a
1745 @ Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(
1746 _,
1747 ) => Self {
1748 command_type: CommandType::ContinueAsNewWorkflowExecution
1749 as i32,
1750 attributes: Some(a),
1751 user_metadata: Default::default(),
1752 },
1753 a @ Attributes::CancelWorkflowExecutionCommandAttributes(_) => {
1754 Self {
1755 command_type: CommandType::CancelWorkflowExecution as i32,
1756 attributes: Some(a),
1757 user_metadata: Default::default(),
1758 }
1759 }
1760 a @ Attributes::RecordMarkerCommandAttributes(_) => Self {
1761 command_type: CommandType::RecordMarker as i32,
1762 attributes: Some(a),
1763 user_metadata: Default::default(),
1764 },
1765 a @ Attributes::ProtocolMessageCommandAttributes(_) => Self {
1766 command_type: CommandType::ProtocolMessage as i32,
1767 attributes: Some(a),
1768 user_metadata: Default::default(),
1769 },
1770 a @ Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1771 Self {
1772 command_type: CommandType::RequestCancelNexusOperation
1773 as i32,
1774 attributes: Some(a),
1775 user_metadata: Default::default(),
1776 }
1777 }
1778 _ => unimplemented!(),
1779 }
1780 }
1781 }
1782
1783 impl Display for Command {
1784 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1785 let ct = CommandType::try_from(self.command_type)
1786 .unwrap_or(CommandType::Unspecified);
1787 write!(f, "{:?}", ct)
1788 }
1789 }
1790
1791 pub trait CommandAttributesExt {
1792 fn as_type(&self) -> CommandType;
1793 }
1794
1795 impl CommandAttributesExt for command::Attributes {
1796 fn as_type(&self) -> CommandType {
1797 match self {
1798 Attributes::ScheduleActivityTaskCommandAttributes(_) => {
1799 CommandType::ScheduleActivityTask
1800 }
1801 Attributes::StartTimerCommandAttributes(_) => CommandType::StartTimer,
1802 Attributes::CompleteWorkflowExecutionCommandAttributes(_) => {
1803 CommandType::CompleteWorkflowExecution
1804 }
1805 Attributes::FailWorkflowExecutionCommandAttributes(_) => {
1806 CommandType::FailWorkflowExecution
1807 }
1808 Attributes::RequestCancelActivityTaskCommandAttributes(_) => {
1809 CommandType::RequestCancelActivityTask
1810 }
1811 Attributes::CancelTimerCommandAttributes(_) => CommandType::CancelTimer,
1812 Attributes::CancelWorkflowExecutionCommandAttributes(_) => {
1813 CommandType::CancelWorkflowExecution
1814 }
1815 Attributes::RequestCancelExternalWorkflowExecutionCommandAttributes(
1816 _,
1817 ) => CommandType::RequestCancelExternalWorkflowExecution,
1818 Attributes::RecordMarkerCommandAttributes(_) => {
1819 CommandType::RecordMarker
1820 }
1821 Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1822 CommandType::ContinueAsNewWorkflowExecution
1823 }
1824 Attributes::StartChildWorkflowExecutionCommandAttributes(_) => {
1825 CommandType::StartChildWorkflowExecution
1826 }
1827 Attributes::SignalExternalWorkflowExecutionCommandAttributes(_) => {
1828 CommandType::SignalExternalWorkflowExecution
1829 }
1830 Attributes::UpsertWorkflowSearchAttributesCommandAttributes(_) => {
1831 CommandType::UpsertWorkflowSearchAttributes
1832 }
1833 Attributes::ProtocolMessageCommandAttributes(_) => {
1834 CommandType::ProtocolMessage
1835 }
1836 Attributes::ModifyWorkflowPropertiesCommandAttributes(_) => {
1837 CommandType::ModifyWorkflowProperties
1838 }
1839 Attributes::ScheduleNexusOperationCommandAttributes(_) => {
1840 CommandType::ScheduleNexusOperation
1841 }
1842 Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1843 CommandType::RequestCancelNexusOperation
1844 }
1845 }
1846 }
1847 }
1848
1849 impl Display for command::Attributes {
1850 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1851 write!(f, "{:?}", self.as_type())
1852 }
1853 }
1854
1855 impl From<workflow_commands::StartTimer> for command::Attributes {
1856 fn from(s: workflow_commands::StartTimer) -> Self {
1857 Self::StartTimerCommandAttributes(StartTimerCommandAttributes {
1858 timer_id: s.seq.to_string(),
1859 start_to_fire_timeout: s.start_to_fire_timeout,
1860 })
1861 }
1862 }
1863
1864 impl From<workflow_commands::UpsertWorkflowSearchAttributes> for command::Attributes {
1865 fn from(s: workflow_commands::UpsertWorkflowSearchAttributes) -> Self {
1866 Self::UpsertWorkflowSearchAttributesCommandAttributes(
1867 UpsertWorkflowSearchAttributesCommandAttributes {
1868 search_attributes: s.search_attributes,
1869 },
1870 )
1871 }
1872 }
1873
1874 impl From<workflow_commands::ModifyWorkflowProperties> for command::Attributes {
1875 fn from(s: workflow_commands::ModifyWorkflowProperties) -> Self {
1876 Self::ModifyWorkflowPropertiesCommandAttributes(
1877 ModifyWorkflowPropertiesCommandAttributes {
1878 upserted_memo: s.upserted_memo.map(Into::into),
1879 },
1880 )
1881 }
1882 }
1883
1884 impl From<workflow_commands::CancelTimer> for command::Attributes {
1885 fn from(s: workflow_commands::CancelTimer) -> Self {
1886 Self::CancelTimerCommandAttributes(CancelTimerCommandAttributes {
1887 timer_id: s.seq.to_string(),
1888 })
1889 }
1890 }
1891
1892 pub fn schedule_activity_cmd_to_api(
1893 s: workflow_commands::ScheduleActivity,
1894 use_workflow_build_id: bool,
1895 ) -> command::Attributes {
1896 command::Attributes::ScheduleActivityTaskCommandAttributes(
1897 ScheduleActivityTaskCommandAttributes {
1898 activity_id: s.activity_id,
1899 activity_type: Some(ActivityType {
1900 name: s.activity_type,
1901 }),
1902 task_queue: Some(s.task_queue.into()),
1903 header: Some(s.headers.into()),
1904 input: s.arguments.into_payloads(),
1905 schedule_to_close_timeout: s.schedule_to_close_timeout,
1906 schedule_to_start_timeout: s.schedule_to_start_timeout,
1907 start_to_close_timeout: s.start_to_close_timeout,
1908 heartbeat_timeout: s.heartbeat_timeout,
1909 retry_policy: s.retry_policy.map(Into::into),
1910 request_eager_execution: !s.do_not_eagerly_execute,
1911 use_workflow_build_id,
1912 priority: s.priority,
1913 },
1914 )
1915 }
1916
1917 #[allow(deprecated)]
1918 pub fn start_child_workflow_cmd_to_api(
1919 s: workflow_commands::StartChildWorkflowExecution,
1920 inherit_build_id: bool,
1921 ) -> command::Attributes {
1922 command::Attributes::StartChildWorkflowExecutionCommandAttributes(
1923 StartChildWorkflowExecutionCommandAttributes {
1924 workflow_id: s.workflow_id,
1925 workflow_type: Some(WorkflowType {
1926 name: s.workflow_type,
1927 }),
1928 control: "".into(),
1929 namespace: s.namespace,
1930 task_queue: Some(s.task_queue.into()),
1931 header: Some(s.headers.into()),
1932 memo: Some(s.memo.into()),
1933 search_attributes: s.search_attributes,
1934 input: s.input.into_payloads(),
1935 workflow_id_reuse_policy: s.workflow_id_reuse_policy,
1936 workflow_execution_timeout: s.workflow_execution_timeout,
1937 workflow_run_timeout: s.workflow_run_timeout,
1938 workflow_task_timeout: s.workflow_task_timeout,
1939 retry_policy: s.retry_policy.map(Into::into),
1940 cron_schedule: s.cron_schedule.clone(),
1941 parent_close_policy: s.parent_close_policy,
1942 inherit_build_id,
1943 priority: s.priority,
1944 },
1945 )
1946 }
1947
1948 impl From<workflow_commands::CompleteWorkflowExecution> for command::Attributes {
1949 fn from(c: workflow_commands::CompleteWorkflowExecution) -> Self {
1950 Self::CompleteWorkflowExecutionCommandAttributes(
1951 CompleteWorkflowExecutionCommandAttributes {
1952 result: c.result.map(Into::into),
1953 },
1954 )
1955 }
1956 }
1957
1958 impl From<workflow_commands::FailWorkflowExecution> for command::Attributes {
1959 fn from(c: workflow_commands::FailWorkflowExecution) -> Self {
1960 Self::FailWorkflowExecutionCommandAttributes(
1961 FailWorkflowExecutionCommandAttributes {
1962 failure: c.failure.map(Into::into),
1963 },
1964 )
1965 }
1966 }
1967
1968 #[allow(deprecated)]
1969 pub fn continue_as_new_cmd_to_api(
1970 c: workflow_commands::ContinueAsNewWorkflowExecution,
1971 inherit_build_id: bool,
1972 ) -> command::Attributes {
1973 command::Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(
1974 ContinueAsNewWorkflowExecutionCommandAttributes {
1975 workflow_type: Some(c.workflow_type.into()),
1976 task_queue: Some(c.task_queue.into()),
1977 input: c.arguments.into_payloads(),
1978 workflow_run_timeout: c.workflow_run_timeout,
1979 workflow_task_timeout: c.workflow_task_timeout,
1980 memo: if c.memo.is_empty() {
1981 None
1982 } else {
1983 Some(c.memo.into())
1984 },
1985 header: if c.headers.is_empty() {
1986 None
1987 } else {
1988 Some(c.headers.into())
1989 },
1990 retry_policy: c.retry_policy,
1991 search_attributes: c.search_attributes,
1992 backoff_start_interval: c.backoff_start_interval,
1993 inherit_build_id,
1994 initial_versioning_behavior: c.initial_versioning_behavior,
1995 ..Default::default()
1996 },
1997 )
1998 }
1999
2000 impl From<workflow_commands::CancelWorkflowExecution> for command::Attributes {
2001 fn from(_c: workflow_commands::CancelWorkflowExecution) -> Self {
2002 Self::CancelWorkflowExecutionCommandAttributes(
2003 CancelWorkflowExecutionCommandAttributes { details: None },
2004 )
2005 }
2006 }
2007
2008 impl From<workflow_commands::ScheduleNexusOperation> for command::Attributes {
2009 fn from(c: workflow_commands::ScheduleNexusOperation) -> Self {
2010 Self::ScheduleNexusOperationCommandAttributes(
2011 ScheduleNexusOperationCommandAttributes {
2012 endpoint: c.endpoint,
2013 service: c.service,
2014 operation: c.operation,
2015 input: c.input,
2016 schedule_to_close_timeout: c.schedule_to_close_timeout,
2017 schedule_to_start_timeout: c.schedule_to_start_timeout,
2018 start_to_close_timeout: c.start_to_close_timeout,
2019 nexus_header: c.nexus_header,
2020 },
2021 )
2022 }
2023 }
2024 }
2025 }
2026 }
2027 #[allow(rustdoc::invalid_html_tags)]
2028 pub mod cloud {
2029 pub mod account {
2030 pub mod v1 {
2031 tonic::include_proto!("temporal.api.cloud.account.v1");
2032 }
2033 }
2034 pub mod auditlog {
2035 pub mod v1 {
2036 tonic::include_proto!("temporal.api.cloud.auditlog.v1");
2037 }
2038 }
2039 pub mod billing {
2040 pub mod v1 {
2041 tonic::include_proto!("temporal.api.cloud.billing.v1");
2042 }
2043 }
2044 pub mod cloudservice {
2045 pub mod v1 {
2046 tonic::include_proto!("temporal.api.cloud.cloudservice.v1");
2047 }
2048 }
2049 pub mod connectivityrule {
2050 pub mod v1 {
2051 tonic::include_proto!("temporal.api.cloud.connectivityrule.v1");
2052 }
2053 }
2054 pub mod identity {
2055 pub mod v1 {
2056 tonic::include_proto!("temporal.api.cloud.identity.v1");
2057 }
2058 }
2059 pub mod namespace {
2060 pub mod v1 {
2061 tonic::include_proto!("temporal.api.cloud.namespace.v1");
2062 }
2063 }
2064 pub mod nexus {
2065 pub mod v1 {
2066 tonic::include_proto!("temporal.api.cloud.nexus.v1");
2067 }
2068 }
2069 pub mod operation {
2070 pub mod v1 {
2071 tonic::include_proto!("temporal.api.cloud.operation.v1");
2072 }
2073 }
2074 pub mod region {
2075 pub mod v1 {
2076 tonic::include_proto!("temporal.api.cloud.region.v1");
2077 }
2078 }
2079 pub mod resource {
2080 pub mod v1 {
2081 tonic::include_proto!("temporal.api.cloud.resource.v1");
2082 }
2083 }
2084 pub mod sink {
2085 pub mod v1 {
2086 tonic::include_proto!("temporal.api.cloud.sink.v1");
2087 }
2088 }
2089 pub mod usage {
2090 pub mod v1 {
2091 tonic::include_proto!("temporal.api.cloud.usage.v1");
2092 }
2093 }
2094 }
2095 pub mod common {
2096 pub mod v1 {
2097 include_proto_with_serde!("temporal.api.common.v1");
2098 mod sdk_helpers {
2099 use super::*;
2100 use crate::protos::{ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL};
2101 use base64::{Engine, prelude::BASE64_STANDARD};
2102 use std::{
2103 collections::HashMap,
2104 fmt::{Display, Formatter},
2105 };
2106
2107 impl<T> From<T> for Payload
2108 where
2109 T: AsRef<[u8]>,
2110 {
2111 fn from(v: T) -> Self {
2112 let mut metadata = HashMap::new();
2115 metadata
2116 .insert(ENCODING_PAYLOAD_KEY.to_string(), b"binary/plain".to_vec());
2117 Self {
2118 metadata,
2119 data: v.as_ref().to_vec(),
2120 external_payloads: Default::default(),
2121 }
2122 }
2123 }
2124
2125 impl Payload {
2126 pub fn as_slice(&self) -> &[u8] {
2128 self.data.as_slice()
2129 }
2130
2131 pub fn is_json_payload(&self) -> bool {
2132 self.metadata
2133 .get(ENCODING_PAYLOAD_KEY)
2134 .map(|v| v.as_slice() == JSON_ENCODING_VAL.as_bytes())
2135 .unwrap_or_default()
2136 }
2137 }
2138
2139 impl std::fmt::Debug for Payload {
2140 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2141 if std::env::var("TEMPORAL_PRINT_FULL_PAYLOADS").is_err()
2142 && self.data.len() > 64
2143 {
2144 let mut windows = self.data.as_slice().windows(32);
2145 write!(
2146 f,
2147 "[{}..{}]",
2148 BASE64_STANDARD.encode(windows.next().unwrap_or_default()),
2149 BASE64_STANDARD.encode(windows.next_back().unwrap_or_default())
2150 )
2151 } else {
2152 write!(f, "[{}]", BASE64_STANDARD.encode(&self.data))
2153 }
2154 }
2155 }
2156
2157 impl Display for Payload {
2158 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2159 write!(f, "{:?}", self)
2160 }
2161 }
2162
2163 impl Display for Header {
2164 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2165 write!(f, "Header(")?;
2166 for kv in &self.fields {
2167 write!(f, "{}: ", kv.0)?;
2168 write!(f, "{}, ", kv.1)?;
2169 }
2170 write!(f, ")")
2171 }
2172 }
2173
2174 impl From<Header> for HashMap<String, Payload> {
2175 fn from(h: Header) -> Self {
2176 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2177 }
2178 }
2179
2180 impl From<Memo> for HashMap<String, Payload> {
2181 fn from(h: Memo) -> Self {
2182 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2183 }
2184 }
2185
2186 impl From<SearchAttributes> for HashMap<String, Payload> {
2187 fn from(h: SearchAttributes) -> Self {
2188 h.indexed_fields
2189 .into_iter()
2190 .map(|(k, v)| (k, v.into()))
2191 .collect()
2192 }
2193 }
2194
2195 impl From<HashMap<String, Payload>> for SearchAttributes {
2196 fn from(h: HashMap<String, Payload>) -> Self {
2197 Self {
2198 indexed_fields: h.into_iter().map(|(k, v)| (k, v.into())).collect(),
2199 }
2200 }
2201 }
2202
2203 impl From<String> for ActivityType {
2204 fn from(name: String) -> Self {
2205 Self { name }
2206 }
2207 }
2208
2209 impl From<&str> for ActivityType {
2210 fn from(name: &str) -> Self {
2211 Self {
2212 name: name.to_string(),
2213 }
2214 }
2215 }
2216
2217 impl From<ActivityType> for String {
2218 fn from(at: ActivityType) -> Self {
2219 at.name
2220 }
2221 }
2222
2223 impl From<&str> for WorkflowType {
2224 fn from(v: &str) -> Self {
2225 Self {
2226 name: v.to_string(),
2227 }
2228 }
2229 }
2230 }
2231 }
2232 }
2233 pub mod compute {
2234 pub mod v1 {
2235 tonic::include_proto!("temporal.api.compute.v1");
2236 }
2237 }
2238 pub mod deployment {
2239 pub mod v1 {
2240 tonic::include_proto!("temporal.api.deployment.v1");
2241 }
2242 }
2243 pub mod enums {
2244 pub mod v1 {
2245 include_proto_with_serde!("temporal.api.enums.v1");
2246 }
2247 }
2248 pub mod errordetails {
2249 pub mod v1 {
2250 tonic::include_proto!("temporal.api.errordetails.v1");
2251 }
2252 }
2253 pub mod failure {
2254 pub mod v1 {
2255 include_proto_with_serde!("temporal.api.failure.v1");
2256 }
2257 }
2258 pub mod filter {
2259 pub mod v1 {
2260 tonic::include_proto!("temporal.api.filter.v1");
2261 }
2262 }
2263 pub mod history {
2264 pub mod v1 {
2265 tonic::include_proto!("temporal.api.history.v1");
2266 pub use self::sdk_helpers::*;
2267 mod sdk_helpers {
2268 use super::*;
2269 use crate::protos::temporal::api::{
2270 enums::v1::EventType, history::v1::history_event::Attributes,
2271 };
2272 use anyhow::bail;
2273 use std::fmt::{Display, Formatter};
2274
2275 impl History {
2276 pub fn extract_run_id_from_start(&self) -> Result<&str, anyhow::Error> {
2277 extract_original_run_id_from_events(&self.events)
2278 }
2279
2280 pub fn last_event_id(&self) -> i64 {
2283 self.events.last().map(|e| e.event_id).unwrap_or_default()
2284 }
2285 }
2286
2287 pub fn extract_original_run_id_from_events(
2288 events: &[HistoryEvent],
2289 ) -> Result<&str, anyhow::Error> {
2290 if let Some(Attributes::WorkflowExecutionStartedEventAttributes(wes)) =
2291 events.get(0).and_then(|x| x.attributes.as_ref())
2292 {
2293 Ok(&wes.original_execution_run_id)
2294 } else {
2295 bail!("First event is not WorkflowExecutionStarted?!?")
2296 }
2297 }
2298
2299 impl HistoryEvent {
2300 pub fn is_command_event(&self) -> bool {
2302 EventType::try_from(self.event_type).map_or(false, |et| match et {
2303 EventType::ActivityTaskScheduled
2304 | EventType::ActivityTaskCancelRequested
2305 | EventType::MarkerRecorded
2306 | EventType::RequestCancelExternalWorkflowExecutionInitiated
2307 | EventType::SignalExternalWorkflowExecutionInitiated
2308 | EventType::StartChildWorkflowExecutionInitiated
2309 | EventType::TimerCanceled
2310 | EventType::TimerStarted
2311 | EventType::UpsertWorkflowSearchAttributes
2312 | EventType::WorkflowPropertiesModified
2313 | EventType::NexusOperationScheduled
2314 | EventType::NexusOperationCancelRequested
2315 | EventType::WorkflowExecutionCanceled
2316 | EventType::WorkflowExecutionCompleted
2317 | EventType::WorkflowExecutionContinuedAsNew
2318 | EventType::WorkflowExecutionFailed
2319 | EventType::WorkflowExecutionUpdateAccepted
2320 | EventType::WorkflowExecutionUpdateRejected
2321 | EventType::WorkflowExecutionUpdateCompleted => true,
2322 _ => false,
2323 })
2324 }
2325
2326 pub fn get_initial_command_event_id(&self) -> Option<i64> {
2330 self.attributes.as_ref().and_then(|a| {
2331 match a {
2334 Attributes::ActivityTaskStartedEventAttributes(a) =>
2335 Some(a.scheduled_event_id),
2336 Attributes::ActivityTaskCompletedEventAttributes(a) =>
2337 Some(a.scheduled_event_id),
2338 Attributes::ActivityTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2339 Attributes::ActivityTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2340 Attributes::ActivityTaskCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2341 Attributes::ActivityTaskCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2342 Attributes::TimerFiredEventAttributes(a) => Some(a.started_event_id),
2343 Attributes::TimerCanceledEventAttributes(a) => Some(a.started_event_id),
2344 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2345 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(a) => Some(a.initiated_event_id),
2346 Attributes::StartChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2347 Attributes::ChildWorkflowExecutionStartedEventAttributes(a) => Some(a.initiated_event_id),
2348 Attributes::ChildWorkflowExecutionCompletedEventAttributes(a) => Some(a.initiated_event_id),
2349 Attributes::ChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2350 Attributes::ChildWorkflowExecutionCanceledEventAttributes(a) => Some(a.initiated_event_id),
2351 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(a) => Some(a.initiated_event_id),
2352 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(a) => Some(a.initiated_event_id),
2353 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2354 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(a) => Some(a.initiated_event_id),
2355 Attributes::WorkflowTaskStartedEventAttributes(a) => Some(a.scheduled_event_id),
2356 Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2357 Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2358 Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2359 Attributes::NexusOperationStartedEventAttributes(a) => Some(a.scheduled_event_id),
2360 Attributes::NexusOperationCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2361 Attributes::NexusOperationFailedEventAttributes(a) => Some(a.scheduled_event_id),
2362 Attributes::NexusOperationTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2363 Attributes::NexusOperationCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2364 Attributes::NexusOperationCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2365 Attributes::NexusOperationCancelRequestCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2366 Attributes::NexusOperationCancelRequestFailedEventAttributes(a) => Some(a.scheduled_event_id),
2367 _ => None
2368 }
2369 })
2370 }
2371
2372 pub fn get_protocol_instance_id(&self) -> Option<&str> {
2374 self.attributes.as_ref().and_then(|a| match a {
2375 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(a) => {
2376 Some(a.protocol_instance_id.as_str())
2377 }
2378 _ => None,
2379 })
2380 }
2381
2382 pub fn is_final_wf_execution_event(&self) -> bool {
2384 match self.event_type() {
2385 EventType::WorkflowExecutionCompleted => true,
2386 EventType::WorkflowExecutionCanceled => true,
2387 EventType::WorkflowExecutionFailed => true,
2388 EventType::WorkflowExecutionTimedOut => true,
2389 EventType::WorkflowExecutionContinuedAsNew => true,
2390 EventType::WorkflowExecutionTerminated => true,
2391 _ => false,
2392 }
2393 }
2394
2395 pub fn is_wft_closed_event(&self) -> bool {
2396 match self.event_type() {
2397 EventType::WorkflowTaskCompleted => true,
2398 EventType::WorkflowTaskFailed => true,
2399 EventType::WorkflowTaskTimedOut => true,
2400 _ => false,
2401 }
2402 }
2403
2404 pub fn is_ignorable(&self) -> bool {
2405 if !self.worker_may_ignore {
2406 return false;
2407 }
2408 if let Some(a) = self.attributes.as_ref() {
2411 match a {
2412 Attributes::WorkflowExecutionStartedEventAttributes(_) => false,
2413 Attributes::WorkflowExecutionCompletedEventAttributes(_) => false,
2414 Attributes::WorkflowExecutionFailedEventAttributes(_) => false,
2415 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => false,
2416 Attributes::WorkflowTaskScheduledEventAttributes(_) => false,
2417 Attributes::WorkflowTaskStartedEventAttributes(_) => false,
2418 Attributes::WorkflowTaskCompletedEventAttributes(_) => false,
2419 Attributes::WorkflowTaskTimedOutEventAttributes(_) => false,
2420 Attributes::WorkflowTaskFailedEventAttributes(_) => false,
2421 Attributes::ActivityTaskScheduledEventAttributes(_) => false,
2422 Attributes::ActivityTaskStartedEventAttributes(_) => false,
2423 Attributes::ActivityTaskCompletedEventAttributes(_) => false,
2424 Attributes::ActivityTaskFailedEventAttributes(_) => false,
2425 Attributes::ActivityTaskTimedOutEventAttributes(_) => false,
2426 Attributes::TimerStartedEventAttributes(_) => false,
2427 Attributes::TimerFiredEventAttributes(_) => false,
2428 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => false,
2429 Attributes::ActivityTaskCanceledEventAttributes(_) => false,
2430 Attributes::TimerCanceledEventAttributes(_) => false,
2431 Attributes::MarkerRecordedEventAttributes(_) => false,
2432 Attributes::WorkflowExecutionSignaledEventAttributes(_) => false,
2433 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => false,
2434 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => false,
2435 Attributes::WorkflowExecutionCanceledEventAttributes(_) => false,
2436 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2437 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => false,
2438 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => false,
2439 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => false,
2440 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => false,
2441 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => false,
2442 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => false,
2443 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => false,
2444 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => false,
2445 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => false,
2446 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => false,
2447 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => false,
2448 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2449 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => false,
2450 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => false,
2451 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => false,
2452 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => false,
2453 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => false,
2454 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => false,
2455 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => false,
2456 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => false,
2457 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => false,
2458 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => false,
2459 Attributes::NexusOperationScheduledEventAttributes(_) => false,
2460 Attributes::NexusOperationStartedEventAttributes(_) => false,
2461 Attributes::NexusOperationCompletedEventAttributes(_) => false,
2462 Attributes::NexusOperationFailedEventAttributes(_) => false,
2463 Attributes::NexusOperationCanceledEventAttributes(_) => false,
2464 Attributes::NexusOperationTimedOutEventAttributes(_) => false,
2465 Attributes::NexusOperationCancelRequestedEventAttributes(_) => false,
2466 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => true,
2468 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => false,
2469 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => false,
2470 Attributes::WorkflowExecutionPausedEventAttributes(_) => true,
2472 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => true,
2474 Attributes::WorkflowExecutionTimeSkippingTransitionedEventAttributes(_) => true,
2476 }
2477 } else {
2478 self.worker_may_ignore
2480 }
2481 }
2482 }
2483
2484 impl Display for HistoryEvent {
2485 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2486 write!(
2487 f,
2488 "HistoryEvent(id: {}, {:?})",
2489 self.event_id,
2490 EventType::try_from(self.event_type).unwrap_or_default()
2491 )
2492 }
2493 }
2494
2495 impl Attributes {
2496 pub fn event_type(&self) -> EventType {
2497 match self {
2499 Attributes::WorkflowExecutionStartedEventAttributes(_) => { EventType::WorkflowExecutionStarted }
2500 Attributes::WorkflowExecutionCompletedEventAttributes(_) => { EventType::WorkflowExecutionCompleted }
2501 Attributes::WorkflowExecutionFailedEventAttributes(_) => { EventType::WorkflowExecutionFailed }
2502 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => { EventType::WorkflowExecutionTimedOut }
2503 Attributes::WorkflowTaskScheduledEventAttributes(_) => { EventType::WorkflowTaskScheduled }
2504 Attributes::WorkflowTaskStartedEventAttributes(_) => { EventType::WorkflowTaskStarted }
2505 Attributes::WorkflowTaskCompletedEventAttributes(_) => { EventType::WorkflowTaskCompleted }
2506 Attributes::WorkflowTaskTimedOutEventAttributes(_) => { EventType::WorkflowTaskTimedOut }
2507 Attributes::WorkflowTaskFailedEventAttributes(_) => { EventType::WorkflowTaskFailed }
2508 Attributes::ActivityTaskScheduledEventAttributes(_) => { EventType::ActivityTaskScheduled }
2509 Attributes::ActivityTaskStartedEventAttributes(_) => { EventType::ActivityTaskStarted }
2510 Attributes::ActivityTaskCompletedEventAttributes(_) => { EventType::ActivityTaskCompleted }
2511 Attributes::ActivityTaskFailedEventAttributes(_) => { EventType::ActivityTaskFailed }
2512 Attributes::ActivityTaskTimedOutEventAttributes(_) => { EventType::ActivityTaskTimedOut }
2513 Attributes::TimerStartedEventAttributes(_) => { EventType::TimerStarted }
2514 Attributes::TimerFiredEventAttributes(_) => { EventType::TimerFired }
2515 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => { EventType::ActivityTaskCancelRequested }
2516 Attributes::ActivityTaskCanceledEventAttributes(_) => { EventType::ActivityTaskCanceled }
2517 Attributes::TimerCanceledEventAttributes(_) => { EventType::TimerCanceled }
2518 Attributes::MarkerRecordedEventAttributes(_) => { EventType::MarkerRecorded }
2519 Attributes::WorkflowExecutionSignaledEventAttributes(_) => { EventType::WorkflowExecutionSignaled }
2520 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => { EventType::WorkflowExecutionTerminated }
2521 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::WorkflowExecutionCancelRequested }
2522 Attributes::WorkflowExecutionCanceledEventAttributes(_) => { EventType::WorkflowExecutionCanceled }
2523 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionInitiated }
2524 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionFailed }
2525 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::ExternalWorkflowExecutionCancelRequested }
2526 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => { EventType::WorkflowExecutionContinuedAsNew }
2527 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => { EventType::StartChildWorkflowExecutionInitiated }
2528 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => { EventType::StartChildWorkflowExecutionFailed }
2529 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => { EventType::ChildWorkflowExecutionStarted }
2530 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => { EventType::ChildWorkflowExecutionCompleted }
2531 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => { EventType::ChildWorkflowExecutionFailed }
2532 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => { EventType::ChildWorkflowExecutionCanceled }
2533 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => { EventType::ChildWorkflowExecutionTimedOut }
2534 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => { EventType::ChildWorkflowExecutionTerminated }
2535 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionInitiated }
2536 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionFailed }
2537 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => { EventType::ExternalWorkflowExecutionSignaled }
2538 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => { EventType::UpsertWorkflowSearchAttributes }
2539 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAdmitted }
2540 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => { EventType::WorkflowExecutionUpdateRejected }
2541 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAccepted }
2542 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => { EventType::WorkflowExecutionUpdateCompleted }
2543 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => { EventType::WorkflowPropertiesModifiedExternally }
2544 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => { EventType::ActivityPropertiesModifiedExternally }
2545 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => { EventType::WorkflowPropertiesModified }
2546 Attributes::NexusOperationScheduledEventAttributes(_) => { EventType::NexusOperationScheduled }
2547 Attributes::NexusOperationStartedEventAttributes(_) => { EventType::NexusOperationStarted }
2548 Attributes::NexusOperationCompletedEventAttributes(_) => { EventType::NexusOperationCompleted }
2549 Attributes::NexusOperationFailedEventAttributes(_) => { EventType::NexusOperationFailed }
2550 Attributes::NexusOperationCanceledEventAttributes(_) => { EventType::NexusOperationCanceled }
2551 Attributes::NexusOperationTimedOutEventAttributes(_) => { EventType::NexusOperationTimedOut }
2552 Attributes::NexusOperationCancelRequestedEventAttributes(_) => { EventType::NexusOperationCancelRequested }
2553 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => { EventType::WorkflowExecutionOptionsUpdated }
2554 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => { EventType::NexusOperationCancelRequestCompleted }
2555 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => { EventType::NexusOperationCancelRequestFailed }
2556 Attributes::WorkflowExecutionPausedEventAttributes(_) => { EventType::WorkflowExecutionPaused }
2557 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => { EventType::WorkflowExecutionUnpaused }
2558 Attributes::WorkflowExecutionTimeSkippingTransitionedEventAttributes(_) => { EventType::WorkflowExecutionTimeSkippingTransitioned }
2559 }
2560 }
2561 }
2562 }
2563 }
2564 }
2565 pub mod namespace {
2566 pub mod v1 {
2567 tonic::include_proto!("temporal.api.namespace.v1");
2568 }
2569 }
2570 pub mod operatorservice {
2571 pub mod v1 {
2572 tonic::include_proto!("temporal.api.operatorservice.v1");
2573 }
2574 }
2575 pub mod protocol {
2576 pub mod v1 {
2577 tonic::include_proto!("temporal.api.protocol.v1");
2578 mod sdk_helpers {
2579 use super::*;
2580 use std::fmt::{Display, Formatter};
2581
2582 impl Display for Message {
2583 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2584 write!(f, "ProtocolMessage({})", self.id)
2585 }
2586 }
2587 }
2588 }
2589 }
2590 pub mod query {
2591 pub mod v1 {
2592 tonic::include_proto!("temporal.api.query.v1");
2593 }
2594 }
2595 pub mod replication {
2596 pub mod v1 {
2597 tonic::include_proto!("temporal.api.replication.v1");
2598 }
2599 }
2600 pub mod rules {
2601 pub mod v1 {
2602 tonic::include_proto!("temporal.api.rules.v1");
2603 }
2604 }
2605 pub mod schedule {
2606 #[allow(rustdoc::invalid_html_tags)]
2607 pub mod v1 {
2608 tonic::include_proto!("temporal.api.schedule.v1");
2609 }
2610 }
2611 pub mod sdk {
2612 pub mod v1 {
2613 tonic::include_proto!("temporal.api.sdk.v1");
2614 }
2615 }
2616 pub mod taskqueue {
2617 pub mod v1 {
2618 tonic::include_proto!("temporal.api.taskqueue.v1");
2619 mod sdk_helpers {
2620 use super::*;
2621 use crate::protos::temporal::api::enums::v1::TaskQueueKind;
2622
2623 impl From<String> for TaskQueue {
2624 fn from(name: String) -> Self {
2625 Self {
2626 name,
2627 kind: TaskQueueKind::Normal as i32,
2628 normal_name: "".to_string(),
2629 }
2630 }
2631 }
2632 }
2633 }
2634 }
2635 pub mod testservice {
2636 pub mod v1 {
2637 tonic::include_proto!("temporal.api.testservice.v1");
2638 }
2639 }
2640 pub mod update {
2641 pub mod v1 {
2642 tonic::include_proto!("temporal.api.update.v1");
2643 mod sdk_helpers {
2644 use super::*;
2645 use crate::protos::temporal::api::update::v1::outcome::Value;
2646
2647 impl Outcome {
2648 pub fn is_success(&self) -> bool {
2649 match self.value {
2650 Some(Value::Success(_)) => true,
2651 _ => false,
2652 }
2653 }
2654 }
2655 }
2656 }
2657 }
2658 pub mod version {
2659 pub mod v1 {
2660 tonic::include_proto!("temporal.api.version.v1");
2661 }
2662 }
2663 pub mod worker {
2664 pub mod v1 {
2665 tonic::include_proto!("temporal.api.worker.v1");
2666 }
2667 }
2668 pub mod workflow {
2669 pub mod v1 {
2670 tonic::include_proto!("temporal.api.workflow.v1");
2671 }
2672 }
2673 pub mod nexus {
2674 pub mod v1 {
2675 tonic::include_proto!("temporal.api.nexus.v1");
2676 pub use self::sdk_helpers::*;
2677 mod sdk_helpers {
2678 use super::*;
2679 use crate::protos::{
2680 camel_case_to_screaming_snake,
2681 temporal::api::{
2682 common::{
2683 self,
2684 v1::link::{WorkflowEvent, workflow_event},
2685 },
2686 enums::v1::EventType,
2687 failure,
2688 },
2689 };
2690 use anyhow::{anyhow, bail};
2691 use http::Uri;
2692 #[cfg(feature = "serde_serialize")]
2693 use prost::Name;
2694 #[cfg(feature = "serde_serialize")]
2695 use std::collections::HashMap;
2696 use std::fmt::{Display, Formatter};
2697
2698 impl Display for Response {
2699 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2700 write!(f, "NexusResponse(",)?;
2701 match &self.variant {
2702 None => {}
2703 Some(v) => {
2704 write!(f, "{v}")?;
2705 }
2706 }
2707 write!(f, ")")
2708 }
2709 }
2710
2711 impl Display for response::Variant {
2712 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2713 match self {
2714 response::Variant::StartOperation(_) => {
2715 write!(f, "StartOperation")
2716 }
2717 response::Variant::CancelOperation(_) => {
2718 write!(f, "CancelOperation")
2719 }
2720 }
2721 }
2722 }
2723
2724 impl Display for HandlerError {
2725 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2726 write!(f, "HandlerError")
2727 }
2728 }
2729
2730 pub enum NexusTaskFailure {
2731 Legacy(HandlerError),
2732 Temporal(failure::v1::Failure),
2733 }
2734
2735 static SCHEME_PREFIX: &str = "temporal://";
2736
2737 pub fn workflow_event_link_from_nexus(
2739 l: &Link,
2740 ) -> Result<common::v1::Link, anyhow::Error> {
2741 if !l.url.starts_with(SCHEME_PREFIX) {
2742 bail!("Invalid scheme for nexus link: {:?}", l.url);
2743 }
2744 let no_authority_url = l.url.strip_prefix(SCHEME_PREFIX).unwrap();
2747 let uri = Uri::try_from(no_authority_url)?;
2748 let parts = uri.into_parts();
2749 let path = parts.path_and_query.ok_or_else(|| {
2750 anyhow!("Failed to parse nexus link, invalid path: {:?}", l)
2751 })?;
2752 let path_parts = path.path().split('/').collect::<Vec<_>>();
2753 if path_parts.get(1) != Some(&"namespaces") {
2754 bail!("Invalid path for nexus link: {:?}", l);
2755 }
2756 let namespace = path_parts.get(2).ok_or_else(|| {
2757 anyhow!("Failed to parse nexus link, no namespace: {:?}", l)
2758 })?;
2759 if path_parts.get(3) != Some(&"workflows") {
2760 bail!("Invalid path for nexus link, no workflows segment: {:?}", l);
2761 }
2762 let workflow_id = path_parts.get(4).ok_or_else(|| {
2763 anyhow!("Failed to parse nexus link, no workflow id: {:?}", l)
2764 })?;
2765 let run_id = path_parts.get(5).ok_or_else(|| {
2766 anyhow!("Failed to parse nexus link, no run id: {:?}", l)
2767 })?;
2768 if path_parts.get(6) != Some(&"history") {
2769 bail!("Invalid path for nexus link, no history segment: {:?}", l);
2770 }
2771 let reference = if let Some(query) = path.query() {
2772 let mut eventref = workflow_event::EventReference::default();
2773 let query_parts = query.split('&').collect::<Vec<_>>();
2774 for qp in query_parts {
2775 let mut kv = qp.split('=');
2776 let key = kv.next().ok_or_else(|| {
2777 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2778 })?;
2779 let val = kv.next().ok_or_else(|| {
2780 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2781 })?;
2782 match key {
2783 "eventID" => {
2784 eventref.event_id = val.parse().map_err(|_| {
2785 anyhow!("Failed to parse nexus link event id: {:?}", l)
2786 })?;
2787 }
2788 "eventType" => {
2789 eventref.event_type = EventType::from_str_name(val)
2790 .unwrap_or_else(|| {
2791 EventType::from_str_name(
2792 &("EVENT_TYPE_".to_string()
2793 + &camel_case_to_screaming_snake(val)),
2794 )
2795 .unwrap_or_default()
2796 })
2797 .into()
2798 }
2799 _ => continue,
2800 }
2801 }
2802 Some(workflow_event::Reference::EventRef(eventref))
2803 } else {
2804 None
2805 };
2806
2807 Ok(common::v1::Link {
2808 variant: Some(common::v1::link::Variant::WorkflowEvent(
2809 WorkflowEvent {
2810 namespace: namespace.to_string(),
2811 workflow_id: workflow_id.to_string(),
2812 run_id: run_id.to_string(),
2813 reference,
2814 },
2815 )),
2816 })
2817 }
2818
2819 #[cfg(feature = "serde_serialize")]
2820 impl TryFrom<failure::v1::Failure> for Failure {
2821 type Error = serde_json::Error;
2822
2823 fn try_from(mut f: failure::v1::Failure) -> Result<Self, Self::Error> {
2824 let message = std::mem::take(&mut f.message);
2826
2827 let details = serde_json::to_vec(&f)?;
2829
2830 Ok(Failure {
2832 message,
2833 stack_trace: f.stack_trace,
2834 metadata: HashMap::from([(
2835 "type".to_string(),
2836 failure::v1::Failure::full_name().into(),
2837 )]),
2838 details,
2839 cause: None,
2840 })
2841 }
2842 }
2843 }
2844 }
2845 }
2846 pub mod nexusservices {
2847 pub mod workerservice {
2848 pub mod v1 {
2849 tonic::include_proto!("temporal.api.nexusservices.workerservice.v1");
2850 }
2851 }
2852 }
2853 pub mod workflowservice {
2854 pub mod v1 {
2855 tonic::include_proto!("temporal.api.workflowservice.v1");
2856 pub use self::sdk_helpers::*;
2857 mod sdk_helpers {
2858 use super::*;
2859 use std::{
2860 convert::TryInto,
2861 fmt::{Display, Formatter},
2862 time::{Duration, SystemTime},
2863 };
2864
2865 macro_rules! sched_to_start_impl {
2866 ($sched_field:ident) => {
2867 pub fn sched_to_start(&self) -> Option<Duration> {
2870 if let Some((sch, st)) =
2871 self.$sched_field.clone().zip(self.started_time.clone())
2872 {
2873 if let Some(value) = elapsed_between_prost_times(sch, st) {
2874 return value;
2875 }
2876 }
2877 None
2878 }
2879 };
2880 }
2881
2882 fn elapsed_between_prost_times(
2883 from: prost_types::Timestamp,
2884 to: prost_types::Timestamp,
2885 ) -> Option<Option<Duration>> {
2886 let from: Result<SystemTime, _> = from.try_into();
2887 let to: Result<SystemTime, _> = to.try_into();
2888 if let (Ok(from), Ok(to)) = (from, to) {
2889 return Some(to.duration_since(from).ok());
2890 }
2891 None
2892 }
2893
2894 impl PollWorkflowTaskQueueResponse {
2895 sched_to_start_impl!(scheduled_time);
2896 }
2897
2898 impl Display for PollWorkflowTaskQueueResponse {
2899 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2900 let last_event = self
2901 .history
2902 .as_ref()
2903 .and_then(|h| h.events.last().map(|he| he.event_id))
2904 .unwrap_or(0);
2905 write!(
2906 f,
2907 "PollWFTQResp(run_id: {}, attempt: {}, last_event: {})",
2908 self.workflow_execution
2909 .as_ref()
2910 .map_or("", |we| we.run_id.as_str()),
2911 self.attempt,
2912 last_event
2913 )
2914 }
2915 }
2916
2917 pub struct CompactHist<'a>(pub &'a PollWorkflowTaskQueueResponse);
2919 impl Display for CompactHist<'_> {
2920 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2921 writeln!(
2922 f,
2923 "PollWorkflowTaskQueueResponse (prev_started: {}, started: {})",
2924 self.0.previous_started_event_id, self.0.started_event_id
2925 )?;
2926 if let Some(h) = self.0.history.as_ref() {
2927 for event in &h.events {
2928 writeln!(f, "{}", event)?;
2929 }
2930 }
2931 writeln!(f, "query: {:#?}", self.0.query)?;
2932 writeln!(f, "queries: {:#?}", self.0.queries)
2933 }
2934 }
2935
2936 impl PollActivityTaskQueueResponse {
2937 sched_to_start_impl!(current_attempt_scheduled_time);
2938 }
2939
2940 impl PollNexusTaskQueueResponse {
2941 pub fn sched_to_start(&self) -> Option<Duration> {
2942 if let Some((sch, st)) = self
2943 .request
2944 .as_ref()
2945 .and_then(|r| r.scheduled_time)
2946 .clone()
2947 .zip(SystemTime::now().try_into().ok())
2948 {
2949 if let Some(value) = elapsed_between_prost_times(sch, st) {
2950 return value;
2951 }
2952 }
2953 None
2954 }
2955 }
2956
2957 impl QueryWorkflowResponse {
2958 pub fn unwrap(
2960 self,
2961 ) -> Vec<crate::protos::temporal::api::common::v1::Payload>
2962 {
2963 self.query_result.unwrap().payloads
2964 }
2965 }
2966 }
2967 }
2968 }
2969 }
2970}
2971
2972#[allow(
2973 clippy::all,
2974 missing_docs,
2975 rustdoc::broken_intra_doc_links,
2976 rustdoc::bare_urls
2977)]
2978pub mod google {
2979 pub mod rpc {
2980 tonic::include_proto!("google.rpc");
2981 }
2982}
2983
2984#[allow(
2985 clippy::all,
2986 missing_docs,
2987 rustdoc::broken_intra_doc_links,
2988 rustdoc::bare_urls
2989)]
2990pub mod grpc {
2991 pub mod health {
2992 pub mod v1 {
2993 tonic::include_proto!("grpc.health.v1");
2994 }
2995 }
2996}
2997mod sdk_helpers {
2998 use std::time::Duration;
2999
3000 pub fn camel_case_to_screaming_snake(val: &str) -> String {
3002 let mut out = String::new();
3003 let mut last_was_upper = true;
3004 for c in val.chars() {
3005 if c.is_uppercase() {
3006 if !last_was_upper {
3007 out.push('_');
3008 }
3009 out.push(c.to_ascii_uppercase());
3010 last_was_upper = true;
3011 } else {
3012 out.push(c.to_ascii_uppercase());
3013 last_was_upper = false;
3014 }
3015 }
3016 out
3017 }
3018
3019 pub fn proto_ts_to_system_time(ts: &prost_types::Timestamp) -> Option<std::time::SystemTime> {
3021 std::time::SystemTime::UNIX_EPOCH.checked_add(
3022 Duration::from_secs(ts.seconds as u64) + Duration::from_nanos(ts.nanos as u64),
3023 )
3024 }
3025
3026 #[cfg(test)]
3027 mod tests {
3028 use crate::protos::{
3029 coresdk::{activity_task, activity_task::ActivityTask},
3030 temporal::api::{
3031 failure::v1::Failure, workflowservice::v1::PollActivityTaskQueueResponse,
3032 },
3033 };
3034 use anyhow::anyhow;
3035
3036 #[test]
3037 fn start_from_poll_resp_standalone_activity_populates_run_id() {
3038 let resp = PollActivityTaskQueueResponse {
3039 task_token: vec![1, 2, 3],
3040 activity_run_id: "test-run-id-123".to_string(),
3041 activity_id: "my-activity".to_string(),
3042 ..Default::default()
3043 };
3044 let task = ActivityTask::start_from_poll_resp(resp);
3045 let start = match task.variant {
3046 Some(activity_task::activity_task::Variant::Start(s)) => s,
3047 _ => panic!("expected Start variant"),
3048 };
3049 assert_eq!(start.run_id, "test-run-id-123");
3050 assert!(!start.is_local);
3051 }
3052
3053 #[test]
3054 fn start_from_poll_resp_workflow_activity_has_empty_run_id() {
3055 use crate::protos::temporal::api::common::v1::WorkflowExecution;
3056 let resp = PollActivityTaskQueueResponse {
3057 task_token: vec![4, 5, 6],
3058 activity_id: "my-workflow-activity".to_string(),
3059 workflow_execution: Some(WorkflowExecution {
3060 workflow_id: "wf-123".to_string(),
3061 run_id: "wf-run-456".to_string(),
3062 }),
3063 ..Default::default()
3065 };
3066 let task = ActivityTask::start_from_poll_resp(resp);
3067 let start = match task.variant {
3068 Some(activity_task::activity_task::Variant::Start(s)) => s,
3069 _ => panic!("expected Start variant"),
3070 };
3071 assert!(start.run_id.is_empty());
3072 assert_eq!(start.workflow_execution.unwrap().run_id, "wf-run-456");
3074 }
3075
3076 #[test]
3077 fn anyhow_to_failure_conversion() {
3078 let no_causes: Failure = anyhow!("no causes").into();
3079 assert_eq!(no_causes.cause, None);
3080 assert_eq!(no_causes.message, "no causes");
3081 let orig = anyhow!("fail 1");
3082 let mid = orig.context("fail 2");
3083 let top = mid.context("fail 3");
3084 let as_fail: Failure = top.into();
3085 assert_eq!(as_fail.message, "fail 3");
3086 assert_eq!(as_fail.cause.as_ref().unwrap().message, "fail 2");
3087 assert_eq!(as_fail.cause.unwrap().cause.unwrap().message, "fail 1");
3088 }
3089 }
3090}
3091pub use self::sdk_helpers::*;