1pub mod constants;
6pub mod utilities;
8
9mod task_token;
10
11use std::time::Duration;
12
13pub use task_token::TaskToken;
14
15pub static ENCODING_PAYLOAD_KEY: &str = "encoding";
17pub static JSON_ENCODING_VAL: &str = "json/plain";
19pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data";
21pub static VERSION_SEARCH_ATTR_KEY: &str = "TemporalChangeVersion";
23
24macro_rules! include_proto_with_serde {
25 ($pkg:tt) => {
26 tonic::include_proto!($pkg);
27
28 include!(concat!(env!("OUT_DIR"), concat!("/", $pkg, ".serde.rs")));
29 };
30}
31
32#[allow(
33 clippy::large_enum_variant,
34 clippy::derive_partial_eq_without_eq,
35 clippy::reserve_after_initialization
36)]
37#[allow(missing_docs)]
39pub mod coresdk {
40 tonic::include_proto!("coresdk");
43
44 use crate::protos::{
45 ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL,
46 temporal::api::{
47 common::v1::{Payload, Payloads, RetryPolicy, WorkflowExecution},
48 enums::v1::{
49 ApplicationErrorCategory, TimeoutType, VersioningBehavior, WorkflowTaskFailedCause,
50 },
51 failure::v1::{
52 ActivityFailureInfo, ApplicationFailureInfo, Failure, TimeoutFailureInfo,
53 failure::FailureInfo,
54 },
55 workflowservice::v1::PollActivityTaskQueueResponse,
56 },
57 };
58 use activity_task::ActivityTask;
59 use serde::{Deserialize, Serialize};
60 use std::{
61 collections::HashMap,
62 convert::TryFrom,
63 fmt::{Display, Formatter},
64 iter::FromIterator,
65 };
66 use workflow_activation::{WorkflowActivationJob, workflow_activation_job};
67 use workflow_commands::{WorkflowCommand, workflow_command, workflow_command::Variant};
68 use workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion};
69
70 #[allow(clippy::module_inception)]
71 pub mod activity_task {
72 use crate::protos::{coresdk::ActivityTaskCompletion, task_token::format_task_token};
73 use std::fmt::{Display, Formatter};
74 tonic::include_proto!("coresdk.activity_task");
75
76 impl ActivityTask {
77 pub fn cancel_from_ids(
78 task_token: Vec<u8>,
79 reason: ActivityCancelReason,
80 details: ActivityCancellationDetails,
81 ) -> Self {
82 Self {
83 task_token,
84 variant: Some(activity_task::Variant::Cancel(Cancel {
85 reason: reason as i32,
86 details: Some(details),
87 })),
88 }
89 }
90
91 pub fn is_timeout(&self) -> bool {
93 match &self.variant {
94 Some(activity_task::Variant::Cancel(Cancel { reason, details })) => {
95 *reason == ActivityCancelReason::TimedOut as i32
96 || details.as_ref().is_some_and(|d| d.is_timed_out)
97 }
98 _ => false,
99 }
100 }
101
102 pub fn primary_reason_to_cancellation_details(
103 reason: ActivityCancelReason,
104 ) -> ActivityCancellationDetails {
105 ActivityCancellationDetails {
106 is_not_found: reason == ActivityCancelReason::NotFound,
107 is_cancelled: reason == ActivityCancelReason::Cancelled,
108 is_paused: reason == ActivityCancelReason::Paused,
109 is_timed_out: reason == ActivityCancelReason::TimedOut,
110 is_worker_shutdown: reason == ActivityCancelReason::WorkerShutdown,
111 is_reset: reason == ActivityCancelReason::Reset,
112 }
113 }
114 }
115
116 impl Display for ActivityTaskCompletion {
117 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
118 write!(
119 f,
120 "ActivityTaskCompletion(token: {}",
121 format_task_token(&self.task_token),
122 )?;
123 if let Some(r) = self.result.as_ref().and_then(|r| r.status.as_ref()) {
124 write!(f, ", {r}")?;
125 } else {
126 write!(f, ", missing result")?;
127 }
128 write!(f, ")")
129 }
130 }
131 }
132 #[allow(clippy::module_inception)]
133 pub mod activity_result {
134 tonic::include_proto!("coresdk.activity_result");
135 use super::super::temporal::api::{
136 common::v1::Payload,
137 failure::v1::{CanceledFailureInfo, Failure as APIFailure, failure},
138 };
139 use crate::protos::{
140 coresdk::activity_result::activity_resolution::Status,
141 temporal::api::enums::v1::TimeoutType,
142 };
143 use activity_execution_result as aer;
144 use anyhow::anyhow;
145 use std::fmt::{Display, Formatter};
146
147 impl ActivityExecutionResult {
148 pub const fn ok(result: Payload) -> Self {
149 Self {
150 status: Some(aer::Status::Completed(Success {
151 result: Some(result),
152 })),
153 }
154 }
155
156 pub fn fail(fail: APIFailure) -> Self {
157 Self {
158 status: Some(aer::Status::Failed(Failure {
159 failure: Some(fail),
160 })),
161 }
162 }
163
164 pub fn cancel_from_details(payload: Option<Payload>) -> Self {
165 Self {
166 status: Some(aer::Status::Cancelled(Cancellation::from_details(payload))),
167 }
168 }
169
170 pub fn cancel(fail: APIFailure) -> Self {
171 Self {
172 status: Some(aer::Status::Cancelled(Cancellation {
173 failure: Some(fail),
174 })),
175 }
176 }
177
178 pub const fn will_complete_async() -> Self {
179 Self {
180 status: Some(aer::Status::WillCompleteAsync(WillCompleteAsync {})),
181 }
182 }
183
184 pub fn is_cancelled(&self) -> bool {
185 matches!(self.status, Some(aer::Status::Cancelled(_)))
186 }
187 }
188
189 impl Display for aer::Status {
190 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191 write!(f, "ActivityExecutionResult(")?;
192 match self {
193 aer::Status::Completed(v) => {
194 write!(f, "{v})")
195 }
196 aer::Status::Failed(v) => {
197 write!(f, "{v})")
198 }
199 aer::Status::Cancelled(v) => {
200 write!(f, "{v})")
201 }
202 aer::Status::WillCompleteAsync(_) => {
203 write!(f, "Will complete async)")
204 }
205 }
206 }
207 }
208
209 impl Display for Success {
210 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
211 write!(f, "Success(")?;
212 if let Some(ref v) = self.result {
213 write!(f, "{v}")?;
214 }
215 write!(f, ")")
216 }
217 }
218
219 impl Display for Failure {
220 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
221 write!(f, "Failure(")?;
222 if let Some(ref v) = self.failure {
223 write!(f, "{v}")?;
224 }
225 write!(f, ")")
226 }
227 }
228
229 impl Display for Cancellation {
230 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231 write!(f, "Cancellation(")?;
232 if let Some(ref v) = self.failure {
233 write!(f, "{v}")?;
234 }
235 write!(f, ")")
236 }
237 }
238
239 impl From<Result<Payload, APIFailure>> for ActivityExecutionResult {
240 fn from(r: Result<Payload, APIFailure>) -> Self {
241 Self {
242 status: match r {
243 Ok(p) => Some(aer::Status::Completed(Success { result: Some(p) })),
244 Err(f) => Some(aer::Status::Failed(Failure { failure: Some(f) })),
245 },
246 }
247 }
248 }
249
250 impl ActivityResolution {
251 pub fn success_payload_or_error(self) -> Result<Option<Payload>, anyhow::Error> {
254 let Some(status) = self.status else {
255 return Err(anyhow!("Activity completed without a status"));
256 };
257
258 match status {
259 activity_resolution::Status::Completed(success) => Ok(success.result),
260 e => Err(anyhow!("Activity was not successful: {e:?}")),
261 }
262 }
263
264 pub fn unwrap_ok_payload(self) -> Payload {
265 self.success_payload_or_error().unwrap().unwrap()
266 }
267
268 pub fn completed_ok(&self) -> bool {
269 matches!(self.status, Some(activity_resolution::Status::Completed(_)))
270 }
271
272 pub fn failed(&self) -> bool {
273 matches!(self.status, Some(activity_resolution::Status::Failed(_)))
274 }
275
276 pub fn timed_out(&self) -> Option<TimeoutType> {
277 match self.status {
278 Some(activity_resolution::Status::Failed(Failure {
279 failure: Some(ref f),
280 })) => f.is_timeout(),
281 _ => None,
282 }
283 }
284
285 pub fn cancelled(&self) -> bool {
286 matches!(self.status, Some(activity_resolution::Status::Cancelled(_)))
287 }
288
289 pub fn unwrap_failure(self) -> APIFailure {
292 match self.status.unwrap() {
293 Status::Failed(f) => f.failure.unwrap(),
294 Status::Cancelled(c) => c.failure.unwrap(),
295 _ => panic!("Actvity did not fail"),
296 }
297 }
298 }
299
300 impl Cancellation {
301 pub fn from_details(details: Option<Payload>) -> Self {
304 Cancellation {
305 failure: Some(APIFailure {
306 message: "Activity cancelled".to_string(),
307 failure_info: Some(failure::FailureInfo::CanceledFailureInfo(
308 CanceledFailureInfo {
309 details: details.map(Into::into),
310 identity: Default::default(),
311 },
312 )),
313 ..Default::default()
314 }),
315 }
316 }
317 }
318 }
319
320 pub mod common {
321 tonic::include_proto!("coresdk.common");
322 use super::external_data::LocalActivityMarkerData;
323 use crate::protos::{
324 PATCHED_MARKER_DETAILS_KEY,
325 coresdk::{
326 AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt,
327 external_data::PatchedMarkerData,
328 },
329 temporal::api::common::v1::{Payload, Payloads},
330 };
331 use std::collections::HashMap;
332
333 pub fn build_has_change_marker_details(
334 patch_id: impl Into<String>,
335 deprecated: bool,
336 ) -> anyhow::Result<HashMap<String, Payloads>> {
337 let mut hm = HashMap::new();
338 let encoded = PatchedMarkerData {
339 id: patch_id.into(),
340 deprecated,
341 }
342 .as_json_payload()?;
343 hm.insert(PATCHED_MARKER_DETAILS_KEY.to_string(), encoded.into());
344 Ok(hm)
345 }
346
347 pub fn decode_change_marker_details(
348 details: &HashMap<String, Payloads>,
349 ) -> Option<(String, bool)> {
350 if let Some(cd) = details.get(PATCHED_MARKER_DETAILS_KEY) {
353 let decoded = PatchedMarkerData::from_json_payload(cd.payloads.first()?).ok()?;
354 return Some((decoded.id, decoded.deprecated));
355 }
356
357 let id_entry = details.get("patch_id")?.payloads.first()?;
358 let deprecated_entry = details.get("deprecated")?.payloads.first()?;
359 let name = std::str::from_utf8(&id_entry.data).ok()?;
360 let deprecated = *deprecated_entry.data.first()? != 0;
361 Some((name.to_string(), deprecated))
362 }
363
364 pub fn build_local_activity_marker_details(
365 metadata: LocalActivityMarkerData,
366 result: Option<Payload>,
367 ) -> HashMap<String, Payloads> {
368 let mut hm = HashMap::new();
369 if let Some(jsonified) = metadata.as_json_payload().into_payloads() {
372 hm.insert("data".to_string(), jsonified);
373 }
374 if let Some(res) = result {
375 hm.insert("result".to_string(), res.into());
376 }
377 hm
378 }
379
380 pub fn extract_local_activity_marker_data(
383 details: &HashMap<String, Payloads>,
384 ) -> Option<LocalActivityMarkerData> {
385 details
386 .get("data")
387 .and_then(|p| p.payloads.first())
388 .and_then(|p| std::str::from_utf8(&p.data).ok())
389 .and_then(|s| serde_json::from_str(s).ok())
390 }
391
392 pub fn extract_local_activity_marker_details(
396 details: &mut HashMap<String, Payloads>,
397 ) -> (Option<LocalActivityMarkerData>, Option<Payload>) {
398 let data = extract_local_activity_marker_data(details);
399 let result = details.remove("result").and_then(|mut p| p.payloads.pop());
400 (data, result)
401 }
402 }
403
404 pub mod external_data {
405 use prost_types::{Duration, Timestamp};
406 use serde::{Deserialize, Deserializer, Serialize, Serializer};
407 tonic::include_proto!("coresdk.external_data");
408
409 #[derive(Serialize, Deserialize)]
413 #[serde(remote = "Timestamp")]
414 struct TimestampDef {
415 seconds: i64,
416 nanos: i32,
417 }
418 mod opt_timestamp {
419 use super::*;
420
421 pub(super) fn serialize<S>(
422 value: &Option<Timestamp>,
423 serializer: S,
424 ) -> Result<S::Ok, S::Error>
425 where
426 S: Serializer,
427 {
428 #[derive(Serialize)]
429 struct Helper<'a>(#[serde(with = "TimestampDef")] &'a Timestamp);
430
431 value.as_ref().map(Helper).serialize(serializer)
432 }
433
434 pub(super) fn deserialize<'de, D>(
435 deserializer: D,
436 ) -> Result<Option<Timestamp>, D::Error>
437 where
438 D: Deserializer<'de>,
439 {
440 #[derive(Deserialize)]
441 struct Helper(#[serde(with = "TimestampDef")] Timestamp);
442
443 let helper = Option::deserialize(deserializer)?;
444 Ok(helper.map(|Helper(external)| external))
445 }
446 }
447
448 #[derive(Serialize, Deserialize)]
450 #[serde(remote = "Duration")]
451 struct DurationDef {
452 seconds: i64,
453 nanos: i32,
454 }
455 mod opt_duration {
456 use super::*;
457
458 pub(super) fn serialize<S>(
459 value: &Option<Duration>,
460 serializer: S,
461 ) -> Result<S::Ok, S::Error>
462 where
463 S: Serializer,
464 {
465 #[derive(Serialize)]
466 struct Helper<'a>(#[serde(with = "DurationDef")] &'a Duration);
467
468 value.as_ref().map(Helper).serialize(serializer)
469 }
470
471 pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
472 where
473 D: Deserializer<'de>,
474 {
475 #[derive(Deserialize)]
476 struct Helper(#[serde(with = "DurationDef")] Duration);
477
478 let helper = Option::deserialize(deserializer)?;
479 Ok(helper.map(|Helper(external)| external))
480 }
481 }
482 }
483
484 pub mod workflow_activation {
485 use crate::protos::{
486 coresdk::{
487 FromPayloadsExt,
488 activity_result::{ActivityResolution, activity_resolution},
489 common::NamespacedWorkflowExecution,
490 fix_retry_policy,
491 workflow_activation::remove_from_cache::EvictionReason,
492 },
493 temporal::api::{
494 enums::v1::WorkflowTaskFailedCause,
495 history::v1::{
496 WorkflowExecutionCancelRequestedEventAttributes,
497 WorkflowExecutionSignaledEventAttributes,
498 WorkflowExecutionStartedEventAttributes,
499 },
500 query::v1::WorkflowQuery,
501 },
502 };
503 use prost_types::Timestamp;
504 use std::fmt::{Display, Formatter};
505
506 tonic::include_proto!("coresdk.workflow_activation");
507
508 pub fn create_evict_activation(
509 run_id: String,
510 message: String,
511 reason: EvictionReason,
512 ) -> WorkflowActivation {
513 WorkflowActivation {
514 timestamp: None,
515 run_id,
516 is_replaying: false,
517 history_length: 0,
518 jobs: vec![WorkflowActivationJob::from(
519 workflow_activation_job::Variant::RemoveFromCache(RemoveFromCache {
520 message,
521 reason: reason as i32,
522 }),
523 )],
524 available_internal_flags: vec![],
525 history_size_bytes: 0,
526 continue_as_new_suggested: false,
527 deployment_version_for_current_task: None,
528 last_sdk_version: String::new(),
529 suggest_continue_as_new_reasons: vec![],
530 target_worker_deployment_version_changed: false,
531 }
532 }
533
534 pub fn query_to_job(id: String, q: WorkflowQuery) -> QueryWorkflow {
535 QueryWorkflow {
536 query_id: id,
537 query_type: q.query_type,
538 arguments: Vec::from_payloads(q.query_args),
539 headers: q.header.map(|h| h.into()).unwrap_or_default(),
540 }
541 }
542
543 impl WorkflowActivation {
544 pub fn is_only_eviction(&self) -> bool {
546 matches!(
547 self.jobs.as_slice(),
548 [WorkflowActivationJob {
549 variant: Some(workflow_activation_job::Variant::RemoveFromCache(_))
550 }]
551 )
552 }
553
554 pub fn eviction_reason(&self) -> Option<EvictionReason> {
556 self.jobs.iter().find_map(|j| {
557 if let Some(workflow_activation_job::Variant::RemoveFromCache(ref rj)) =
558 j.variant
559 {
560 EvictionReason::try_from(rj.reason).ok()
561 } else {
562 None
563 }
564 })
565 }
566 }
567
568 impl workflow_activation_job::Variant {
569 pub fn is_local_activity_resolution(&self) -> bool {
570 matches!(self, workflow_activation_job::Variant::ResolveActivity(ra) if ra.is_local)
571 }
572 }
573
574 impl Display for EvictionReason {
575 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
576 write!(f, "{self:?}")
577 }
578 }
579
580 impl From<EvictionReason> for WorkflowTaskFailedCause {
581 fn from(value: EvictionReason) -> Self {
582 match value {
583 EvictionReason::Nondeterminism => {
584 WorkflowTaskFailedCause::NonDeterministicError
585 }
586 _ => WorkflowTaskFailedCause::Unspecified,
587 }
588 }
589 }
590
591 impl Display for WorkflowActivation {
592 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
593 write!(f, "WorkflowActivation(")?;
594 write!(f, "run_id: {}, ", self.run_id)?;
595 write!(f, "is_replaying: {}, ", self.is_replaying)?;
596 write!(
597 f,
598 "jobs: {})",
599 self.jobs
600 .iter()
601 .map(ToString::to_string)
602 .collect::<Vec<_>>()
603 .as_slice()
604 .join(", ")
605 )
606 }
607 }
608
609 impl Display for WorkflowActivationJob {
610 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
611 match &self.variant {
612 None => write!(f, "empty"),
613 Some(v) => write!(f, "{v}"),
614 }
615 }
616 }
617
618 impl Display for workflow_activation_job::Variant {
619 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
620 match self {
621 workflow_activation_job::Variant::InitializeWorkflow(_) => {
622 write!(f, "InitializeWorkflow")
623 }
624 workflow_activation_job::Variant::FireTimer(t) => {
625 write!(f, "FireTimer({})", t.seq)
626 }
627 workflow_activation_job::Variant::UpdateRandomSeed(_) => {
628 write!(f, "UpdateRandomSeed")
629 }
630 workflow_activation_job::Variant::QueryWorkflow(_) => {
631 write!(f, "QueryWorkflow")
632 }
633 workflow_activation_job::Variant::CancelWorkflow(_) => {
634 write!(f, "CancelWorkflow")
635 }
636 workflow_activation_job::Variant::SignalWorkflow(_) => {
637 write!(f, "SignalWorkflow")
638 }
639 workflow_activation_job::Variant::ResolveActivity(r) => {
640 write!(
641 f,
642 "ResolveActivity({}, {})",
643 r.seq,
644 r.result
645 .as_ref()
646 .unwrap_or(&ActivityResolution { status: None })
647 )
648 }
649 workflow_activation_job::Variant::NotifyHasPatch(_) => {
650 write!(f, "NotifyHasPatch")
651 }
652 workflow_activation_job::Variant::ResolveChildWorkflowExecutionStart(_) => {
653 write!(f, "ResolveChildWorkflowExecutionStart")
654 }
655 workflow_activation_job::Variant::ResolveChildWorkflowExecution(_) => {
656 write!(f, "ResolveChildWorkflowExecution")
657 }
658 workflow_activation_job::Variant::ResolveSignalExternalWorkflow(_) => {
659 write!(f, "ResolveSignalExternalWorkflow")
660 }
661 workflow_activation_job::Variant::RemoveFromCache(_) => {
662 write!(f, "RemoveFromCache")
663 }
664 workflow_activation_job::Variant::ResolveRequestCancelExternalWorkflow(_) => {
665 write!(f, "ResolveRequestCancelExternalWorkflow")
666 }
667 workflow_activation_job::Variant::DoUpdate(u) => {
668 write!(f, "DoUpdate({})", u.id)
669 }
670 workflow_activation_job::Variant::ResolveNexusOperationStart(_) => {
671 write!(f, "ResolveNexusOperationStart")
672 }
673 workflow_activation_job::Variant::ResolveNexusOperation(_) => {
674 write!(f, "ResolveNexusOperation")
675 }
676 }
677 }
678 }
679
680 impl Display for ActivityResolution {
681 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
682 match self.status {
683 None => {
684 write!(f, "None")
685 }
686 Some(activity_resolution::Status::Failed(_)) => {
687 write!(f, "Failed")
688 }
689 Some(activity_resolution::Status::Completed(_)) => {
690 write!(f, "Completed")
691 }
692 Some(activity_resolution::Status::Cancelled(_)) => {
693 write!(f, "Cancelled")
694 }
695 Some(activity_resolution::Status::Backoff(_)) => {
696 write!(f, "Backoff")
697 }
698 }
699 }
700 }
701
702 impl Display for QueryWorkflow {
703 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
704 write!(
705 f,
706 "QueryWorkflow(id: {}, type: {})",
707 self.query_id, self.query_type
708 )
709 }
710 }
711
712 impl From<WorkflowExecutionSignaledEventAttributes> for SignalWorkflow {
713 fn from(a: WorkflowExecutionSignaledEventAttributes) -> Self {
714 Self {
715 signal_name: a.signal_name,
716 input: Vec::from_payloads(a.input),
717 identity: a.identity,
718 headers: a.header.map(Into::into).unwrap_or_default(),
719 }
720 }
721 }
722
723 impl From<WorkflowExecutionCancelRequestedEventAttributes> for CancelWorkflow {
724 fn from(a: WorkflowExecutionCancelRequestedEventAttributes) -> Self {
725 Self { reason: a.cause }
726 }
727 }
728
729 pub fn start_workflow_from_attribs(
731 attrs: WorkflowExecutionStartedEventAttributes,
732 workflow_id: String,
733 randomness_seed: u64,
734 start_time: Timestamp,
735 ) -> InitializeWorkflow {
736 InitializeWorkflow {
737 workflow_type: attrs.workflow_type.map(|wt| wt.name).unwrap_or_default(),
738 workflow_id,
739 arguments: Vec::from_payloads(attrs.input),
740 randomness_seed,
741 headers: attrs.header.unwrap_or_default().fields,
742 identity: attrs.identity,
743 parent_workflow_info: attrs.parent_workflow_execution.map(|pe| {
744 NamespacedWorkflowExecution {
745 namespace: attrs.parent_workflow_namespace,
746 run_id: pe.run_id,
747 workflow_id: pe.workflow_id,
748 }
749 }),
750 workflow_execution_timeout: attrs.workflow_execution_timeout,
751 workflow_run_timeout: attrs.workflow_run_timeout,
752 workflow_task_timeout: attrs.workflow_task_timeout,
753 continued_from_execution_run_id: attrs.continued_execution_run_id,
754 continued_initiator: attrs.initiator,
755 continued_failure: attrs.continued_failure,
756 last_completion_result: attrs.last_completion_result,
757 first_execution_run_id: attrs.first_execution_run_id,
758 retry_policy: attrs.retry_policy.map(fix_retry_policy),
759 attempt: attrs.attempt,
760 cron_schedule: attrs.cron_schedule,
761 workflow_execution_expiration_time: attrs.workflow_execution_expiration_time,
762 cron_schedule_to_schedule_interval: attrs.first_workflow_task_backoff,
763 memo: attrs.memo,
764 search_attributes: attrs.search_attributes,
765 start_time: Some(start_time),
766 root_workflow: attrs.root_workflow_execution,
767 priority: attrs.priority,
768 }
769 }
770 }
771
772 pub mod workflow_completion {
773 use crate::protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure};
774 tonic::include_proto!("coresdk.workflow_completion");
775
776 impl workflow_activation_completion::Status {
777 pub const fn is_success(&self) -> bool {
778 match &self {
779 Self::Successful(_) => true,
780 Self::Failed(_) => false,
781 }
782 }
783 }
784
785 impl From<failure::v1::Failure> for Failure {
786 fn from(f: failure::v1::Failure) -> Self {
787 Failure {
788 failure: Some(f),
789 force_cause: WorkflowTaskFailedCause::Unspecified as i32,
790 }
791 }
792 }
793 }
794
795 pub mod child_workflow {
796 tonic::include_proto!("coresdk.child_workflow");
797 }
798
799 pub mod nexus {
800 use crate::protos::temporal::api::workflowservice::v1::PollNexusTaskQueueResponse;
801 use std::fmt::{Display, Formatter};
802
803 tonic::include_proto!("coresdk.nexus");
804
805 impl NexusTask {
806 pub fn unwrap_task(self) -> PollNexusTaskQueueResponse {
808 if let Some(nexus_task::Variant::Task(t)) = self.variant {
809 return t;
810 }
811 panic!("Nexus task did not contain a server task");
812 }
813
814 pub fn task_token(&self) -> &[u8] {
816 match &self.variant {
817 Some(nexus_task::Variant::Task(t)) => t.task_token.as_slice(),
818 Some(nexus_task::Variant::CancelTask(c)) => c.task_token.as_slice(),
819 None => panic!("Nexus task did not contain a task token"),
820 }
821 }
822 }
823
824 impl Display for nexus_task_completion::Status {
825 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
826 write!(f, "NexusTaskCompletion(")?;
827 match self {
828 nexus_task_completion::Status::Completed(c) => {
829 write!(f, "{c}")
830 }
831 nexus_task_completion::Status::AckCancel(_) => {
832 write!(f, "AckCancel")
833 }
834 #[allow(deprecated)]
835 nexus_task_completion::Status::Error(error) => {
836 write!(f, "Error({error:?})")
837 }
838 nexus_task_completion::Status::Failure(failure) => {
839 write!(f, "{failure}")
840 }
841 }?;
842 write!(f, ")")
843 }
844 }
845
846 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
847 pub enum NexusOperationErrorState {
848 Failed,
849 Canceled,
850 }
851
852 impl Display for NexusOperationErrorState {
853 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
854 match self {
855 Self::Failed => write!(f, "failed"),
856 Self::Canceled => write!(f, "canceled"),
857 }
858 }
859 }
860 }
861
862 pub mod workflow_commands {
863 tonic::include_proto!("coresdk.workflow_commands");
864
865 use crate::protos::temporal::api::{common::v1::Payloads, enums::v1::QueryResultType};
866 use std::fmt::{Display, Formatter};
867
868 impl Display for WorkflowCommand {
869 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
870 match &self.variant {
871 None => write!(f, "Empty"),
872 Some(v) => write!(f, "{v}"),
873 }
874 }
875 }
876
877 impl Display for StartTimer {
878 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
879 write!(f, "StartTimer({})", self.seq)
880 }
881 }
882
883 impl Display for ScheduleActivity {
884 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
885 write!(f, "ScheduleActivity({}, {})", self.seq, self.activity_type)
886 }
887 }
888
889 impl Display for ScheduleLocalActivity {
890 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
891 write!(
892 f,
893 "ScheduleLocalActivity({}, {})",
894 self.seq, self.activity_type
895 )
896 }
897 }
898
899 impl Display for QueryResult {
900 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
901 write!(f, "RespondToQuery({})", self.query_id)
902 }
903 }
904
905 impl Display for RequestCancelActivity {
906 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
907 write!(f, "RequestCancelActivity({})", self.seq)
908 }
909 }
910
911 impl Display for RequestCancelLocalActivity {
912 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
913 write!(f, "RequestCancelLocalActivity({})", self.seq)
914 }
915 }
916
917 impl Display for CancelTimer {
918 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
919 write!(f, "CancelTimer({})", self.seq)
920 }
921 }
922
923 impl Display for CompleteWorkflowExecution {
924 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
925 write!(f, "CompleteWorkflowExecution")
926 }
927 }
928
929 impl Display for FailWorkflowExecution {
930 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
931 write!(f, "FailWorkflowExecution")
932 }
933 }
934
935 impl Display for ContinueAsNewWorkflowExecution {
936 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
937 write!(f, "ContinueAsNewWorkflowExecution")
938 }
939 }
940
941 impl Display for CancelWorkflowExecution {
942 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
943 write!(f, "CancelWorkflowExecution")
944 }
945 }
946
947 impl Display for SetPatchMarker {
948 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
949 write!(f, "SetPatchMarker({})", self.patch_id)
950 }
951 }
952
953 impl Display for StartChildWorkflowExecution {
954 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
955 write!(
956 f,
957 "StartChildWorkflowExecution({}, {})",
958 self.seq, self.workflow_type
959 )
960 }
961 }
962
963 impl Display for RequestCancelExternalWorkflowExecution {
964 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
965 write!(f, "RequestCancelExternalWorkflowExecution({})", self.seq)
966 }
967 }
968
969 impl Display for UpsertWorkflowSearchAttributes {
970 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
971 let keys: Vec<_> = self
972 .search_attributes
973 .as_ref()
974 .map(|sa| sa.indexed_fields.keys().collect())
975 .unwrap_or_default();
976 write!(f, "UpsertWorkflowSearchAttributes({:?})", keys)
977 }
978 }
979
980 impl Display for SignalExternalWorkflowExecution {
981 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
982 write!(f, "SignalExternalWorkflowExecution({})", self.seq)
983 }
984 }
985
986 impl Display for CancelSignalWorkflow {
987 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
988 write!(f, "CancelSignalWorkflow({})", self.seq)
989 }
990 }
991
992 impl Display for CancelChildWorkflowExecution {
993 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
994 write!(
995 f,
996 "CancelChildWorkflowExecution({})",
997 self.child_workflow_seq
998 )
999 }
1000 }
1001
1002 impl Display for ModifyWorkflowProperties {
1003 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1004 write!(
1005 f,
1006 "ModifyWorkflowProperties(upserted memo keys: {:?})",
1007 self.upserted_memo.as_ref().map(|m| m.fields.keys())
1008 )
1009 }
1010 }
1011
1012 impl Display for UpdateResponse {
1013 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1014 write!(
1015 f,
1016 "UpdateResponse(protocol_instance_id: {}, response: {:?})",
1017 self.protocol_instance_id, self.response
1018 )
1019 }
1020 }
1021
1022 impl Display for ScheduleNexusOperation {
1023 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1024 write!(f, "ScheduleNexusOperation({})", self.seq)
1025 }
1026 }
1027
1028 impl Display for RequestCancelNexusOperation {
1029 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1030 write!(f, "RequestCancelNexusOperation({})", self.seq)
1031 }
1032 }
1033
1034 impl QueryResult {
1035 pub fn into_components(self) -> (String, QueryResultType, Option<Payloads>, String) {
1037 match self {
1038 QueryResult {
1039 variant: Some(query_result::Variant::Succeeded(qs)),
1040 query_id,
1041 } => (
1042 query_id,
1043 QueryResultType::Answered,
1044 qs.response.map(Into::into),
1045 "".to_string(),
1046 ),
1047 QueryResult {
1048 variant: Some(query_result::Variant::Failed(err)),
1049 query_id,
1050 } => (query_id, QueryResultType::Failed, None, err.message),
1051 QueryResult {
1052 variant: None,
1053 query_id,
1054 } => (
1055 query_id,
1056 QueryResultType::Failed,
1057 None,
1058 "Query response was empty".to_string(),
1059 ),
1060 }
1061 }
1062 }
1063 }
1064
1065 pub type HistoryEventId = i64;
1066
1067 impl From<workflow_activation_job::Variant> for WorkflowActivationJob {
1068 fn from(a: workflow_activation_job::Variant) -> Self {
1069 Self { variant: Some(a) }
1070 }
1071 }
1072
1073 impl From<Vec<WorkflowCommand>> for workflow_completion::Success {
1074 fn from(v: Vec<WorkflowCommand>) -> Self {
1075 Self {
1076 commands: v,
1077 used_internal_flags: vec![],
1078 versioning_behavior: VersioningBehavior::Unspecified.into(),
1079 }
1080 }
1081 }
1082
1083 impl From<workflow_command::Variant> for WorkflowCommand {
1084 fn from(v: workflow_command::Variant) -> Self {
1085 Self {
1086 variant: Some(v),
1087 user_metadata: None,
1088 }
1089 }
1090 }
1091
1092 impl workflow_completion::Success {
1093 pub fn from_variants(cmds: Vec<Variant>) -> Self {
1094 let cmds: Vec<_> = cmds.into_iter().map(|c| c.into()).collect();
1095 cmds.into()
1096 }
1097 }
1098
1099 impl WorkflowActivationCompletion {
1100 pub fn empty(run_id: impl Into<String>) -> Self {
1102 let success = workflow_completion::Success::from_variants(vec![]);
1103 Self {
1104 run_id: run_id.into(),
1105 status: Some(workflow_activation_completion::Status::Successful(success)),
1106 }
1107 }
1108
1109 pub fn from_cmds(run_id: impl Into<String>, cmds: Vec<workflow_command::Variant>) -> Self {
1111 let success = workflow_completion::Success::from_variants(cmds);
1112 Self {
1113 run_id: run_id.into(),
1114 status: Some(workflow_activation_completion::Status::Successful(success)),
1115 }
1116 }
1117
1118 pub fn from_cmd(run_id: impl Into<String>, cmd: workflow_command::Variant) -> Self {
1120 let success = workflow_completion::Success::from_variants(vec![cmd]);
1121 Self {
1122 run_id: run_id.into(),
1123 status: Some(workflow_activation_completion::Status::Successful(success)),
1124 }
1125 }
1126
1127 pub fn fail(
1128 run_id: impl Into<String>,
1129 failure: Failure,
1130 cause: Option<WorkflowTaskFailedCause>,
1131 ) -> Self {
1132 Self {
1133 run_id: run_id.into(),
1134 status: Some(workflow_activation_completion::Status::Failed(
1135 workflow_completion::Failure {
1136 failure: Some(failure),
1137 force_cause: cause.unwrap_or(WorkflowTaskFailedCause::Unspecified) as i32,
1138 },
1139 )),
1140 }
1141 }
1142
1143 pub fn has_execution_ending(&self) -> bool {
1146 self.has_complete_workflow_execution()
1147 || self.has_fail_execution()
1148 || self.has_continue_as_new()
1149 || self.has_cancel_workflow_execution()
1150 }
1151
1152 pub fn has_fail_execution(&self) -> bool {
1154 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1155 return s.commands.iter().any(|wfc| {
1156 matches!(
1157 wfc,
1158 WorkflowCommand {
1159 variant: Some(workflow_command::Variant::FailWorkflowExecution(_)),
1160 ..
1161 }
1162 )
1163 });
1164 }
1165 false
1166 }
1167
1168 pub fn has_cancel_workflow_execution(&self) -> bool {
1170 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1171 return s.commands.iter().any(|wfc| {
1172 matches!(
1173 wfc,
1174 WorkflowCommand {
1175 variant: Some(workflow_command::Variant::CancelWorkflowExecution(_)),
1176 ..
1177 }
1178 )
1179 });
1180 }
1181 false
1182 }
1183
1184 pub fn has_continue_as_new(&self) -> bool {
1186 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1187 return s.commands.iter().any(|wfc| {
1188 matches!(
1189 wfc,
1190 WorkflowCommand {
1191 variant: Some(
1192 workflow_command::Variant::ContinueAsNewWorkflowExecution(_)
1193 ),
1194 ..
1195 }
1196 )
1197 });
1198 }
1199 false
1200 }
1201
1202 pub fn has_complete_workflow_execution(&self) -> bool {
1204 self.complete_workflow_execution_value().is_some()
1205 }
1206
1207 pub fn complete_workflow_execution_value(&self) -> Option<&Payload> {
1209 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1210 s.commands.iter().find_map(|wfc| match wfc {
1211 WorkflowCommand {
1212 variant: Some(workflow_command::Variant::CompleteWorkflowExecution(v)),
1213 ..
1214 } => v.result.as_ref(),
1215 _ => None,
1216 })
1217 } else {
1218 None
1219 }
1220 }
1221
1222 pub fn is_empty(&self) -> bool {
1224 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1225 return s.commands.is_empty();
1226 }
1227 false
1228 }
1229
1230 pub fn add_internal_flags(&mut self, patch: u32) {
1231 if let Some(workflow_activation_completion::Status::Successful(s)) = &mut self.status {
1232 s.used_internal_flags.push(patch);
1233 }
1234 }
1235 }
1236
1237 pub trait IntoCompletion {
1239 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion;
1241 }
1242
1243 impl IntoCompletion for workflow_command::Variant {
1244 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1245 WorkflowActivationCompletion::from_cmd(run_id, self)
1246 }
1247 }
1248
1249 impl<I, V> IntoCompletion for I
1250 where
1251 I: IntoIterator<Item = V>,
1252 V: Into<WorkflowCommand>,
1253 {
1254 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1255 let success = self.into_iter().map(Into::into).collect::<Vec<_>>().into();
1256 WorkflowActivationCompletion {
1257 run_id,
1258 status: Some(workflow_activation_completion::Status::Successful(success)),
1259 }
1260 }
1261 }
1262
1263 impl Display for WorkflowActivationCompletion {
1264 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1265 write!(
1266 f,
1267 "WorkflowActivationCompletion(run_id: {}, status: ",
1268 &self.run_id
1269 )?;
1270 match &self.status {
1271 None => write!(f, "empty")?,
1272 Some(s) => write!(f, "{s}")?,
1273 };
1274 write!(f, ")")
1275 }
1276 }
1277
1278 impl Display for workflow_activation_completion::Status {
1279 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1280 match self {
1281 workflow_activation_completion::Status::Successful(
1282 workflow_completion::Success { commands, .. },
1283 ) => {
1284 write!(f, "Success(")?;
1285 let mut written = 0;
1286 for c in commands {
1287 write!(f, "{c} ")?;
1288 written += 1;
1289 if written >= 10 && written < commands.len() {
1290 write!(f, "... {} more", commands.len() - written)?;
1291 break;
1292 }
1293 }
1294 write!(f, ")")
1295 }
1296 workflow_activation_completion::Status::Failed(_) => {
1297 write!(f, "Failed")
1298 }
1299 }
1300 }
1301 }
1302
1303 impl ActivityTask {
1304 pub fn start_from_poll_resp(r: PollActivityTaskQueueResponse) -> Self {
1305 let (workflow_id, run_id) = r
1306 .workflow_execution
1307 .map(|we| (we.workflow_id, we.run_id))
1308 .unwrap_or_default();
1309 Self {
1310 task_token: r.task_token,
1311 variant: Some(activity_task::activity_task::Variant::Start(
1312 activity_task::Start {
1313 workflow_namespace: r.workflow_namespace,
1314 workflow_type: r.workflow_type.map_or_else(|| "".to_string(), |wt| wt.name),
1315 workflow_execution: Some(WorkflowExecution {
1316 workflow_id,
1317 run_id,
1318 }),
1319 activity_id: r.activity_id,
1320 activity_type: r.activity_type.map_or_else(|| "".to_string(), |at| at.name),
1321 header_fields: r.header.map(Into::into).unwrap_or_default(),
1322 input: Vec::from_payloads(r.input),
1323 heartbeat_details: Vec::from_payloads(r.heartbeat_details),
1324 scheduled_time: r.scheduled_time,
1325 current_attempt_scheduled_time: r.current_attempt_scheduled_time,
1326 started_time: r.started_time,
1327 attempt: r.attempt as u32,
1328 schedule_to_close_timeout: r.schedule_to_close_timeout,
1329 start_to_close_timeout: r.start_to_close_timeout,
1330 heartbeat_timeout: r.heartbeat_timeout,
1331 retry_policy: r.retry_policy.map(fix_retry_policy),
1332 priority: r.priority,
1333 is_local: false,
1334 run_id: r.activity_run_id,
1335 },
1336 )),
1337 }
1338 }
1339 }
1340
1341 impl Failure {
1342 pub fn is_timeout(&self) -> Option<crate::protos::temporal::api::enums::v1::TimeoutType> {
1343 match &self.failure_info {
1344 Some(FailureInfo::TimeoutFailureInfo(ti)) => Some(ti.timeout_type()),
1345 _ => {
1346 if let Some(c) = &self.cause {
1347 c.is_timeout()
1348 } else {
1349 None
1350 }
1351 }
1352 }
1353 }
1354
1355 pub fn application_failure(message: String, non_retryable: bool) -> Self {
1356 Self {
1357 message,
1358 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1359 ApplicationFailureInfo {
1360 non_retryable,
1361 ..Default::default()
1362 },
1363 )),
1364 ..Default::default()
1365 }
1366 }
1367
1368 pub fn application_failure_from_error(ae: anyhow::Error, non_retryable: bool) -> Self {
1369 Self {
1370 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1371 ApplicationFailureInfo {
1372 non_retryable,
1373 ..Default::default()
1374 },
1375 )),
1376 ..ae.chain()
1377 .rfold(None, |cause, e| {
1378 Some(Self {
1379 message: e.to_string(),
1380 cause: cause.map(Box::new),
1381 ..Default::default()
1382 })
1383 })
1384 .unwrap_or_default()
1385 }
1386 }
1387
1388 pub fn timeout(timeout_type: TimeoutType) -> Self {
1389 Self {
1390 message: "Activity timed out".to_string(),
1391 cause: Some(Box::new(Failure {
1392 message: "Activity timed out".to_string(),
1393 failure_info: Some(FailureInfo::TimeoutFailureInfo(TimeoutFailureInfo {
1394 timeout_type: timeout_type.into(),
1395 ..Default::default()
1396 })),
1397 ..Default::default()
1398 })),
1399 failure_info: Some(FailureInfo::ActivityFailureInfo(
1400 ActivityFailureInfo::default(),
1401 )),
1402 ..Default::default()
1403 }
1404 }
1405
1406 pub fn maybe_application_failure(&self) -> Option<&ApplicationFailureInfo> {
1408 if let Failure {
1409 failure_info: Some(FailureInfo::ApplicationFailureInfo(f)),
1410 ..
1411 } = self
1412 {
1413 Some(f)
1414 } else {
1415 None
1416 }
1417 }
1418
1419 pub fn is_benign_application_failure(&self) -> bool {
1421 self.maybe_application_failure()
1422 .is_some_and(|app_info| app_info.category() == ApplicationErrorCategory::Benign)
1423 }
1424 }
1425
1426 impl Display for Failure {
1427 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1428 write!(f, "Failure({}, ", self.message)?;
1429 match self.failure_info.as_ref() {
1430 None => write!(f, "missing info")?,
1431 Some(FailureInfo::TimeoutFailureInfo(v)) => {
1432 write!(f, "Timeout: {:?}", v.timeout_type())?;
1433 }
1434 Some(FailureInfo::ApplicationFailureInfo(v)) => {
1435 write!(f, "Application Failure: {}", v.r#type)?;
1436 }
1437 Some(FailureInfo::CanceledFailureInfo(_)) => {
1438 write!(f, "Cancelled")?;
1439 }
1440 Some(FailureInfo::TerminatedFailureInfo(_)) => {
1441 write!(f, "Terminated")?;
1442 }
1443 Some(FailureInfo::ServerFailureInfo(_)) => {
1444 write!(f, "Server Failure")?;
1445 }
1446 Some(FailureInfo::ResetWorkflowFailureInfo(_)) => {
1447 write!(f, "Reset Workflow")?;
1448 }
1449 Some(FailureInfo::ActivityFailureInfo(v)) => {
1450 write!(
1451 f,
1452 "Activity Failure: scheduled_event_id: {}",
1453 v.scheduled_event_id
1454 )?;
1455 }
1456 Some(FailureInfo::ChildWorkflowExecutionFailureInfo(v)) => {
1457 write!(
1458 f,
1459 "Child Workflow: started_event_id: {}",
1460 v.started_event_id
1461 )?;
1462 }
1463 Some(FailureInfo::NexusOperationExecutionFailureInfo(v)) => {
1464 write!(
1465 f,
1466 "Nexus Operation Failure: scheduled_event_id: {}",
1467 v.scheduled_event_id
1468 )?;
1469 }
1470 Some(FailureInfo::NexusHandlerFailureInfo(v)) => {
1471 write!(f, "Nexus Handler Failure: {}", v.r#type)?;
1472 }
1473 }
1474 write!(f, ")")
1475 }
1476 }
1477
1478 impl From<&str> for Failure {
1479 fn from(v: &str) -> Self {
1480 Failure::application_failure(v.to_string(), false)
1481 }
1482 }
1483
1484 impl From<String> for Failure {
1485 fn from(v: String) -> Self {
1486 Failure::application_failure(v, false)
1487 }
1488 }
1489
1490 impl From<anyhow::Error> for Failure {
1491 fn from(ae: anyhow::Error) -> Self {
1492 Failure::application_failure_from_error(ae, false)
1493 }
1494 }
1495
1496 pub trait FromPayloadsExt {
1497 fn from_payloads(p: Option<Payloads>) -> Self;
1498 }
1499 impl<T> FromPayloadsExt for T
1500 where
1501 T: FromIterator<Payload>,
1502 {
1503 fn from_payloads(p: Option<Payloads>) -> Self {
1504 match p {
1505 None => std::iter::empty().collect(),
1506 Some(p) => p.payloads.into_iter().collect(),
1507 }
1508 }
1509 }
1510
1511 pub trait IntoPayloadsExt {
1512 fn into_payloads(self) -> Option<Payloads>;
1513 }
1514 impl<T> IntoPayloadsExt for T
1515 where
1516 T: IntoIterator<Item = Payload>,
1517 {
1518 fn into_payloads(self) -> Option<Payloads> {
1519 let mut iterd = self.into_iter().peekable();
1520 if iterd.peek().is_none() {
1521 None
1522 } else {
1523 Some(Payloads {
1524 payloads: iterd.collect(),
1525 })
1526 }
1527 }
1528 }
1529
1530 impl From<Payload> for Payloads {
1531 fn from(p: Payload) -> Self {
1532 Self { payloads: vec![p] }
1533 }
1534 }
1535
1536 impl<T> From<T> for Payloads
1537 where
1538 T: AsRef<[u8]>,
1539 {
1540 fn from(v: T) -> Self {
1541 Self {
1542 payloads: vec![v.into()],
1543 }
1544 }
1545 }
1546
1547 #[derive(thiserror::Error, Debug)]
1548 pub enum PayloadDeserializeErr {
1549 #[error("This deserializer does not understand this payload")]
1552 DeserializerDoesNotHandle,
1553 #[error("Error during deserialization: {0}")]
1554 DeserializeErr(#[from] anyhow::Error),
1555 }
1556
1557 pub trait AsJsonPayloadExt {
1560 fn as_json_payload(&self) -> anyhow::Result<Payload>;
1561 }
1562 impl<T> AsJsonPayloadExt for T
1563 where
1564 T: Serialize,
1565 {
1566 fn as_json_payload(&self) -> anyhow::Result<Payload> {
1567 let as_json = serde_json::to_string(self)?;
1568 let mut metadata = HashMap::new();
1569 metadata.insert(
1570 ENCODING_PAYLOAD_KEY.to_string(),
1571 JSON_ENCODING_VAL.as_bytes().to_vec(),
1572 );
1573 Ok(Payload {
1574 metadata,
1575 data: as_json.into_bytes(),
1576 external_payloads: Default::default(),
1577 })
1578 }
1579 }
1580
1581 pub trait FromJsonPayloadExt: Sized {
1582 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr>;
1583 }
1584 impl<T> FromJsonPayloadExt for T
1585 where
1586 T: for<'de> Deserialize<'de>,
1587 {
1588 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr> {
1589 if !payload.is_json_payload() {
1590 return Err(PayloadDeserializeErr::DeserializerDoesNotHandle);
1591 }
1592 let payload_str = std::str::from_utf8(&payload.data).map_err(anyhow::Error::from)?;
1593 Ok(serde_json::from_str(payload_str).map_err(anyhow::Error::from)?)
1594 }
1595 }
1596
1597 #[derive(derive_more::Display, Debug)]
1599 pub enum PayloadsToPayloadError {
1600 MoreThanOnePayload,
1601 NoPayload,
1602 }
1603 impl TryFrom<Payloads> for Payload {
1604 type Error = PayloadsToPayloadError;
1605
1606 fn try_from(mut v: Payloads) -> Result<Self, Self::Error> {
1607 match v.payloads.pop() {
1608 None => Err(PayloadsToPayloadError::NoPayload),
1609 Some(p) => {
1610 if v.payloads.is_empty() {
1611 Ok(p)
1612 } else {
1613 Err(PayloadsToPayloadError::MoreThanOnePayload)
1614 }
1615 }
1616 }
1617 }
1618 }
1619
1620 fn fix_retry_policy(mut retry_policy: RetryPolicy) -> RetryPolicy {
1623 if retry_policy.initial_interval.is_none() {
1624 retry_policy.initial_interval = Default::default();
1625 }
1626 retry_policy
1627 }
1628}
1629
1630#[allow(
1632 clippy::all,
1633 missing_docs,
1634 rustdoc::broken_intra_doc_links,
1635 rustdoc::bare_urls
1636)]
1637pub mod temporal {
1639 pub mod api {
1640 pub mod activity {
1641 pub mod v1 {
1642 tonic::include_proto!("temporal.api.activity.v1");
1643 }
1644 }
1645 pub mod batch {
1646 pub mod v1 {
1647 tonic::include_proto!("temporal.api.batch.v1");
1648 }
1649 }
1650 pub mod callback {
1651 pub mod v1 {
1652 tonic::include_proto!("temporal.api.callback.v1");
1653 }
1654 }
1655 pub mod command {
1656 pub mod v1 {
1657 tonic::include_proto!("temporal.api.command.v1");
1658
1659 use crate::protos::{
1660 coresdk::{IntoPayloadsExt, workflow_commands},
1661 temporal::api::{
1662 common::v1::{ActivityType, WorkflowType},
1663 enums::v1::CommandType,
1664 },
1665 };
1666 use command::Attributes;
1667 use std::fmt::{Display, Formatter};
1668
1669 impl From<command::Attributes> for Command {
1670 fn from(c: command::Attributes) -> Self {
1671 match c {
1672 a @ Attributes::StartTimerCommandAttributes(_) => Self {
1673 command_type: CommandType::StartTimer as i32,
1674 attributes: Some(a),
1675 user_metadata: Default::default(),
1676 },
1677 a @ Attributes::CancelTimerCommandAttributes(_) => Self {
1678 command_type: CommandType::CancelTimer as i32,
1679 attributes: Some(a),
1680 user_metadata: Default::default(),
1681 },
1682 a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => Self {
1683 command_type: CommandType::CompleteWorkflowExecution as i32,
1684 attributes: Some(a),
1685 user_metadata: Default::default(),
1686 },
1687 a @ Attributes::FailWorkflowExecutionCommandAttributes(_) => Self {
1688 command_type: CommandType::FailWorkflowExecution as i32,
1689 attributes: Some(a),
1690 user_metadata: Default::default(),
1691 },
1692 a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self {
1693 command_type: CommandType::ScheduleActivityTask as i32,
1694 attributes: Some(a),
1695 user_metadata: Default::default(),
1696 },
1697 a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => Self {
1698 command_type: CommandType::RequestCancelActivityTask as i32,
1699 attributes: Some(a),
1700 user_metadata: Default::default(),
1701 },
1702 a @ Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1703 Self {
1704 command_type: CommandType::ContinueAsNewWorkflowExecution
1705 as i32,
1706 attributes: Some(a),
1707 user_metadata: Default::default(),
1708 }
1709 }
1710 a @ Attributes::CancelWorkflowExecutionCommandAttributes(_) => Self {
1711 command_type: CommandType::CancelWorkflowExecution as i32,
1712 attributes: Some(a),
1713 user_metadata: Default::default(),
1714 },
1715 a @ Attributes::RecordMarkerCommandAttributes(_) => Self {
1716 command_type: CommandType::RecordMarker as i32,
1717 attributes: Some(a),
1718 user_metadata: Default::default(),
1719 },
1720 a @ Attributes::ProtocolMessageCommandAttributes(_) => Self {
1721 command_type: CommandType::ProtocolMessage as i32,
1722 attributes: Some(a),
1723 user_metadata: Default::default(),
1724 },
1725 a @ Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1726 Self {
1727 command_type: CommandType::RequestCancelNexusOperation as i32,
1728 attributes: Some(a),
1729 user_metadata: Default::default(),
1730 }
1731 }
1732 _ => unimplemented!(),
1733 }
1734 }
1735 }
1736
1737 impl Display for Command {
1738 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1739 let ct = CommandType::try_from(self.command_type)
1740 .unwrap_or(CommandType::Unspecified);
1741 write!(f, "{:?}", ct)
1742 }
1743 }
1744
1745 pub trait CommandAttributesExt {
1746 fn as_type(&self) -> CommandType;
1747 }
1748
1749 impl CommandAttributesExt for command::Attributes {
1750 fn as_type(&self) -> CommandType {
1751 match self {
1752 Attributes::ScheduleActivityTaskCommandAttributes(_) => {
1753 CommandType::ScheduleActivityTask
1754 }
1755 Attributes::StartTimerCommandAttributes(_) => CommandType::StartTimer,
1756 Attributes::CompleteWorkflowExecutionCommandAttributes(_) => {
1757 CommandType::CompleteWorkflowExecution
1758 }
1759 Attributes::FailWorkflowExecutionCommandAttributes(_) => {
1760 CommandType::FailWorkflowExecution
1761 }
1762 Attributes::RequestCancelActivityTaskCommandAttributes(_) => {
1763 CommandType::RequestCancelActivityTask
1764 }
1765 Attributes::CancelTimerCommandAttributes(_) => CommandType::CancelTimer,
1766 Attributes::CancelWorkflowExecutionCommandAttributes(_) => {
1767 CommandType::CancelWorkflowExecution
1768 }
1769 Attributes::RequestCancelExternalWorkflowExecutionCommandAttributes(
1770 _,
1771 ) => CommandType::RequestCancelExternalWorkflowExecution,
1772 Attributes::RecordMarkerCommandAttributes(_) => {
1773 CommandType::RecordMarker
1774 }
1775 Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1776 CommandType::ContinueAsNewWorkflowExecution
1777 }
1778 Attributes::StartChildWorkflowExecutionCommandAttributes(_) => {
1779 CommandType::StartChildWorkflowExecution
1780 }
1781 Attributes::SignalExternalWorkflowExecutionCommandAttributes(_) => {
1782 CommandType::SignalExternalWorkflowExecution
1783 }
1784 Attributes::UpsertWorkflowSearchAttributesCommandAttributes(_) => {
1785 CommandType::UpsertWorkflowSearchAttributes
1786 }
1787 Attributes::ProtocolMessageCommandAttributes(_) => {
1788 CommandType::ProtocolMessage
1789 }
1790 Attributes::ModifyWorkflowPropertiesCommandAttributes(_) => {
1791 CommandType::ModifyWorkflowProperties
1792 }
1793 Attributes::ScheduleNexusOperationCommandAttributes(_) => {
1794 CommandType::ScheduleNexusOperation
1795 }
1796 Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1797 CommandType::RequestCancelNexusOperation
1798 }
1799 }
1800 }
1801 }
1802
1803 impl Display for command::Attributes {
1804 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1805 write!(f, "{:?}", self.as_type())
1806 }
1807 }
1808
1809 impl From<workflow_commands::StartTimer> for command::Attributes {
1810 fn from(s: workflow_commands::StartTimer) -> Self {
1811 Self::StartTimerCommandAttributes(StartTimerCommandAttributes {
1812 timer_id: s.seq.to_string(),
1813 start_to_fire_timeout: s.start_to_fire_timeout,
1814 })
1815 }
1816 }
1817
1818 impl From<workflow_commands::UpsertWorkflowSearchAttributes> for command::Attributes {
1819 fn from(s: workflow_commands::UpsertWorkflowSearchAttributes) -> Self {
1820 Self::UpsertWorkflowSearchAttributesCommandAttributes(
1821 UpsertWorkflowSearchAttributesCommandAttributes {
1822 search_attributes: s.search_attributes,
1823 },
1824 )
1825 }
1826 }
1827
1828 impl From<workflow_commands::ModifyWorkflowProperties> for command::Attributes {
1829 fn from(s: workflow_commands::ModifyWorkflowProperties) -> Self {
1830 Self::ModifyWorkflowPropertiesCommandAttributes(
1831 ModifyWorkflowPropertiesCommandAttributes {
1832 upserted_memo: s.upserted_memo.map(Into::into),
1833 },
1834 )
1835 }
1836 }
1837
1838 impl From<workflow_commands::CancelTimer> for command::Attributes {
1839 fn from(s: workflow_commands::CancelTimer) -> Self {
1840 Self::CancelTimerCommandAttributes(CancelTimerCommandAttributes {
1841 timer_id: s.seq.to_string(),
1842 })
1843 }
1844 }
1845
1846 pub fn schedule_activity_cmd_to_api(
1847 s: workflow_commands::ScheduleActivity,
1848 use_workflow_build_id: bool,
1849 ) -> command::Attributes {
1850 command::Attributes::ScheduleActivityTaskCommandAttributes(
1851 ScheduleActivityTaskCommandAttributes {
1852 activity_id: s.activity_id,
1853 activity_type: Some(ActivityType {
1854 name: s.activity_type,
1855 }),
1856 task_queue: Some(s.task_queue.into()),
1857 header: Some(s.headers.into()),
1858 input: s.arguments.into_payloads(),
1859 schedule_to_close_timeout: s.schedule_to_close_timeout,
1860 schedule_to_start_timeout: s.schedule_to_start_timeout,
1861 start_to_close_timeout: s.start_to_close_timeout,
1862 heartbeat_timeout: s.heartbeat_timeout,
1863 retry_policy: s.retry_policy.map(Into::into),
1864 request_eager_execution: !s.do_not_eagerly_execute,
1865 use_workflow_build_id,
1866 priority: s.priority,
1867 },
1868 )
1869 }
1870
1871 #[allow(deprecated)]
1872 pub fn start_child_workflow_cmd_to_api(
1873 s: workflow_commands::StartChildWorkflowExecution,
1874 inherit_build_id: bool,
1875 ) -> command::Attributes {
1876 command::Attributes::StartChildWorkflowExecutionCommandAttributes(
1877 StartChildWorkflowExecutionCommandAttributes {
1878 workflow_id: s.workflow_id,
1879 workflow_type: Some(WorkflowType {
1880 name: s.workflow_type,
1881 }),
1882 control: "".into(),
1883 namespace: s.namespace,
1884 task_queue: Some(s.task_queue.into()),
1885 header: Some(s.headers.into()),
1886 memo: Some(s.memo.into()),
1887 search_attributes: s.search_attributes,
1888 input: s.input.into_payloads(),
1889 workflow_id_reuse_policy: s.workflow_id_reuse_policy,
1890 workflow_execution_timeout: s.workflow_execution_timeout,
1891 workflow_run_timeout: s.workflow_run_timeout,
1892 workflow_task_timeout: s.workflow_task_timeout,
1893 retry_policy: s.retry_policy.map(Into::into),
1894 cron_schedule: s.cron_schedule.clone(),
1895 parent_close_policy: s.parent_close_policy,
1896 inherit_build_id,
1897 priority: s.priority,
1898 },
1899 )
1900 }
1901
1902 impl From<workflow_commands::CompleteWorkflowExecution> for command::Attributes {
1903 fn from(c: workflow_commands::CompleteWorkflowExecution) -> Self {
1904 Self::CompleteWorkflowExecutionCommandAttributes(
1905 CompleteWorkflowExecutionCommandAttributes {
1906 result: c.result.map(Into::into),
1907 },
1908 )
1909 }
1910 }
1911
1912 impl From<workflow_commands::FailWorkflowExecution> for command::Attributes {
1913 fn from(c: workflow_commands::FailWorkflowExecution) -> Self {
1914 Self::FailWorkflowExecutionCommandAttributes(
1915 FailWorkflowExecutionCommandAttributes {
1916 failure: c.failure.map(Into::into),
1917 },
1918 )
1919 }
1920 }
1921
1922 #[allow(deprecated)]
1923 pub fn continue_as_new_cmd_to_api(
1924 c: workflow_commands::ContinueAsNewWorkflowExecution,
1925 inherit_build_id: bool,
1926 ) -> command::Attributes {
1927 command::Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(
1928 ContinueAsNewWorkflowExecutionCommandAttributes {
1929 workflow_type: Some(c.workflow_type.into()),
1930 task_queue: Some(c.task_queue.into()),
1931 input: c.arguments.into_payloads(),
1932 workflow_run_timeout: c.workflow_run_timeout,
1933 workflow_task_timeout: c.workflow_task_timeout,
1934 memo: if c.memo.is_empty() {
1935 None
1936 } else {
1937 Some(c.memo.into())
1938 },
1939 header: if c.headers.is_empty() {
1940 None
1941 } else {
1942 Some(c.headers.into())
1943 },
1944 retry_policy: c.retry_policy,
1945 search_attributes: c.search_attributes,
1946 inherit_build_id,
1947 initial_versioning_behavior: c.initial_versioning_behavior,
1948 ..Default::default()
1949 },
1950 )
1951 }
1952
1953 impl From<workflow_commands::CancelWorkflowExecution> for command::Attributes {
1954 fn from(_c: workflow_commands::CancelWorkflowExecution) -> Self {
1955 Self::CancelWorkflowExecutionCommandAttributes(
1956 CancelWorkflowExecutionCommandAttributes { details: None },
1957 )
1958 }
1959 }
1960
1961 impl From<workflow_commands::ScheduleNexusOperation> for command::Attributes {
1962 fn from(c: workflow_commands::ScheduleNexusOperation) -> Self {
1963 Self::ScheduleNexusOperationCommandAttributes(
1964 ScheduleNexusOperationCommandAttributes {
1965 endpoint: c.endpoint,
1966 service: c.service,
1967 operation: c.operation,
1968 input: c.input,
1969 schedule_to_close_timeout: c.schedule_to_close_timeout,
1970 schedule_to_start_timeout: c.schedule_to_start_timeout,
1971 start_to_close_timeout: c.start_to_close_timeout,
1972 nexus_header: c.nexus_header,
1973 },
1974 )
1975 }
1976 }
1977 }
1978 }
1979 #[allow(rustdoc::invalid_html_tags)]
1980 pub mod cloud {
1981 pub mod account {
1982 pub mod v1 {
1983 tonic::include_proto!("temporal.api.cloud.account.v1");
1984 }
1985 }
1986 pub mod auditlog {
1987 pub mod v1 {
1988 tonic::include_proto!("temporal.api.cloud.auditlog.v1");
1989 }
1990 }
1991 pub mod billing {
1992 pub mod v1 {
1993 tonic::include_proto!("temporal.api.cloud.billing.v1");
1994 }
1995 }
1996 pub mod cloudservice {
1997 pub mod v1 {
1998 tonic::include_proto!("temporal.api.cloud.cloudservice.v1");
1999 }
2000 }
2001 pub mod connectivityrule {
2002 pub mod v1 {
2003 tonic::include_proto!("temporal.api.cloud.connectivityrule.v1");
2004 }
2005 }
2006 pub mod identity {
2007 pub mod v1 {
2008 tonic::include_proto!("temporal.api.cloud.identity.v1");
2009 }
2010 }
2011 pub mod namespace {
2012 pub mod v1 {
2013 tonic::include_proto!("temporal.api.cloud.namespace.v1");
2014 }
2015 }
2016 pub mod nexus {
2017 pub mod v1 {
2018 tonic::include_proto!("temporal.api.cloud.nexus.v1");
2019 }
2020 }
2021 pub mod operation {
2022 pub mod v1 {
2023 tonic::include_proto!("temporal.api.cloud.operation.v1");
2024 }
2025 }
2026 pub mod region {
2027 pub mod v1 {
2028 tonic::include_proto!("temporal.api.cloud.region.v1");
2029 }
2030 }
2031 pub mod resource {
2032 pub mod v1 {
2033 tonic::include_proto!("temporal.api.cloud.resource.v1");
2034 }
2035 }
2036 pub mod sink {
2037 pub mod v1 {
2038 tonic::include_proto!("temporal.api.cloud.sink.v1");
2039 }
2040 }
2041 pub mod usage {
2042 pub mod v1 {
2043 tonic::include_proto!("temporal.api.cloud.usage.v1");
2044 }
2045 }
2046 }
2047 pub mod common {
2048 pub mod v1 {
2049 use crate::protos::{ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL};
2050 use base64::{Engine, prelude::BASE64_STANDARD};
2051 use std::{
2052 collections::HashMap,
2053 fmt::{Display, Formatter},
2054 };
2055 include_proto_with_serde!("temporal.api.common.v1");
2056
2057 impl<T> From<T> for Payload
2058 where
2059 T: AsRef<[u8]>,
2060 {
2061 fn from(v: T) -> Self {
2062 let mut metadata = HashMap::new();
2065 metadata.insert(ENCODING_PAYLOAD_KEY.to_string(), b"binary/plain".to_vec());
2066 Self {
2067 metadata,
2068 data: v.as_ref().to_vec(),
2069 external_payloads: Default::default(),
2070 }
2071 }
2072 }
2073
2074 impl Payload {
2075 pub fn as_slice(&self) -> &[u8] {
2077 self.data.as_slice()
2078 }
2079
2080 pub fn is_json_payload(&self) -> bool {
2081 self.metadata
2082 .get(ENCODING_PAYLOAD_KEY)
2083 .map(|v| v.as_slice() == JSON_ENCODING_VAL.as_bytes())
2084 .unwrap_or_default()
2085 }
2086 }
2087
2088 impl std::fmt::Debug for Payload {
2089 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2090 if std::env::var("TEMPORAL_PRINT_FULL_PAYLOADS").is_err()
2091 && self.data.len() > 64
2092 {
2093 let mut windows = self.data.as_slice().windows(32);
2094 write!(
2095 f,
2096 "[{}..{}]",
2097 BASE64_STANDARD.encode(windows.next().unwrap_or_default()),
2098 BASE64_STANDARD.encode(windows.next_back().unwrap_or_default())
2099 )
2100 } else {
2101 write!(f, "[{}]", BASE64_STANDARD.encode(&self.data))
2102 }
2103 }
2104 }
2105
2106 impl Display for Payload {
2107 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2108 write!(f, "{:?}", self)
2109 }
2110 }
2111
2112 impl Display for Header {
2113 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2114 write!(f, "Header(")?;
2115 for kv in &self.fields {
2116 write!(f, "{}: ", kv.0)?;
2117 write!(f, "{}, ", kv.1)?;
2118 }
2119 write!(f, ")")
2120 }
2121 }
2122
2123 impl From<Header> for HashMap<String, Payload> {
2124 fn from(h: Header) -> Self {
2125 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2126 }
2127 }
2128
2129 impl From<Memo> for HashMap<String, Payload> {
2130 fn from(h: Memo) -> Self {
2131 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2132 }
2133 }
2134
2135 impl From<SearchAttributes> for HashMap<String, Payload> {
2136 fn from(h: SearchAttributes) -> Self {
2137 h.indexed_fields
2138 .into_iter()
2139 .map(|(k, v)| (k, v.into()))
2140 .collect()
2141 }
2142 }
2143
2144 impl From<HashMap<String, Payload>> for SearchAttributes {
2145 fn from(h: HashMap<String, Payload>) -> Self {
2146 Self {
2147 indexed_fields: h.into_iter().map(|(k, v)| (k, v.into())).collect(),
2148 }
2149 }
2150 }
2151
2152 impl From<String> for ActivityType {
2153 fn from(name: String) -> Self {
2154 Self { name }
2155 }
2156 }
2157
2158 impl From<&str> for ActivityType {
2159 fn from(name: &str) -> Self {
2160 Self {
2161 name: name.to_string(),
2162 }
2163 }
2164 }
2165
2166 impl From<ActivityType> for String {
2167 fn from(at: ActivityType) -> Self {
2168 at.name
2169 }
2170 }
2171
2172 impl From<&str> for WorkflowType {
2173 fn from(v: &str) -> Self {
2174 Self {
2175 name: v.to_string(),
2176 }
2177 }
2178 }
2179 }
2180 }
2181 pub mod compute {
2182 pub mod v1 {
2183 tonic::include_proto!("temporal.api.compute.v1");
2184 }
2185 }
2186 pub mod deployment {
2187 pub mod v1 {
2188 tonic::include_proto!("temporal.api.deployment.v1");
2189 }
2190 }
2191 pub mod enums {
2192 pub mod v1 {
2193 include_proto_with_serde!("temporal.api.enums.v1");
2194 }
2195 }
2196 pub mod errordetails {
2197 pub mod v1 {
2198 tonic::include_proto!("temporal.api.errordetails.v1");
2199 }
2200 }
2201 pub mod failure {
2202 pub mod v1 {
2203 include_proto_with_serde!("temporal.api.failure.v1");
2204 }
2205 }
2206 pub mod filter {
2207 pub mod v1 {
2208 tonic::include_proto!("temporal.api.filter.v1");
2209 }
2210 }
2211 pub mod history {
2212 pub mod v1 {
2213 use crate::protos::temporal::api::{
2214 enums::v1::EventType, history::v1::history_event::Attributes,
2215 };
2216 use anyhow::bail;
2217 use std::fmt::{Display, Formatter};
2218
2219 tonic::include_proto!("temporal.api.history.v1");
2220
2221 impl History {
2222 pub fn extract_run_id_from_start(&self) -> Result<&str, anyhow::Error> {
2223 extract_original_run_id_from_events(&self.events)
2224 }
2225
2226 pub fn last_event_id(&self) -> i64 {
2229 self.events.last().map(|e| e.event_id).unwrap_or_default()
2230 }
2231 }
2232
2233 pub fn extract_original_run_id_from_events(
2234 events: &[HistoryEvent],
2235 ) -> Result<&str, anyhow::Error> {
2236 if let Some(Attributes::WorkflowExecutionStartedEventAttributes(wes)) =
2237 events.get(0).and_then(|x| x.attributes.as_ref())
2238 {
2239 Ok(&wes.original_execution_run_id)
2240 } else {
2241 bail!("First event is not WorkflowExecutionStarted?!?")
2242 }
2243 }
2244
2245 impl HistoryEvent {
2246 pub fn is_command_event(&self) -> bool {
2248 EventType::try_from(self.event_type).map_or(false, |et| match et {
2249 EventType::ActivityTaskScheduled
2250 | EventType::ActivityTaskCancelRequested
2251 | EventType::MarkerRecorded
2252 | EventType::RequestCancelExternalWorkflowExecutionInitiated
2253 | EventType::SignalExternalWorkflowExecutionInitiated
2254 | EventType::StartChildWorkflowExecutionInitiated
2255 | EventType::TimerCanceled
2256 | EventType::TimerStarted
2257 | EventType::UpsertWorkflowSearchAttributes
2258 | EventType::WorkflowPropertiesModified
2259 | EventType::NexusOperationScheduled
2260 | EventType::NexusOperationCancelRequested
2261 | EventType::WorkflowExecutionCanceled
2262 | EventType::WorkflowExecutionCompleted
2263 | EventType::WorkflowExecutionContinuedAsNew
2264 | EventType::WorkflowExecutionFailed
2265 | EventType::WorkflowExecutionUpdateAccepted
2266 | EventType::WorkflowExecutionUpdateRejected
2267 | EventType::WorkflowExecutionUpdateCompleted => true,
2268 _ => false,
2269 })
2270 }
2271
2272 pub fn get_initial_command_event_id(&self) -> Option<i64> {
2276 self.attributes.as_ref().and_then(|a| {
2277 match a {
2280 Attributes::ActivityTaskStartedEventAttributes(a) =>
2281 Some(a.scheduled_event_id),
2282 Attributes::ActivityTaskCompletedEventAttributes(a) =>
2283 Some(a.scheduled_event_id),
2284 Attributes::ActivityTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2285 Attributes::ActivityTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2286 Attributes::ActivityTaskCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2287 Attributes::ActivityTaskCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2288 Attributes::TimerFiredEventAttributes(a) => Some(a.started_event_id),
2289 Attributes::TimerCanceledEventAttributes(a) => Some(a.started_event_id),
2290 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2291 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(a) => Some(a.initiated_event_id),
2292 Attributes::StartChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2293 Attributes::ChildWorkflowExecutionStartedEventAttributes(a) => Some(a.initiated_event_id),
2294 Attributes::ChildWorkflowExecutionCompletedEventAttributes(a) => Some(a.initiated_event_id),
2295 Attributes::ChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2296 Attributes::ChildWorkflowExecutionCanceledEventAttributes(a) => Some(a.initiated_event_id),
2297 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(a) => Some(a.initiated_event_id),
2298 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(a) => Some(a.initiated_event_id),
2299 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2300 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(a) => Some(a.initiated_event_id),
2301 Attributes::WorkflowTaskStartedEventAttributes(a) => Some(a.scheduled_event_id),
2302 Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2303 Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2304 Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2305 Attributes::NexusOperationStartedEventAttributes(a) => Some(a.scheduled_event_id),
2306 Attributes::NexusOperationCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2307 Attributes::NexusOperationFailedEventAttributes(a) => Some(a.scheduled_event_id),
2308 Attributes::NexusOperationTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2309 Attributes::NexusOperationCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2310 Attributes::NexusOperationCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2311 Attributes::NexusOperationCancelRequestCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2312 Attributes::NexusOperationCancelRequestFailedEventAttributes(a) => Some(a.scheduled_event_id),
2313 _ => None
2314 }
2315 })
2316 }
2317
2318 pub fn get_protocol_instance_id(&self) -> Option<&str> {
2320 self.attributes.as_ref().and_then(|a| match a {
2321 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(a) => {
2322 Some(a.protocol_instance_id.as_str())
2323 }
2324 _ => None,
2325 })
2326 }
2327
2328 pub fn is_final_wf_execution_event(&self) -> bool {
2330 match self.event_type() {
2331 EventType::WorkflowExecutionCompleted => true,
2332 EventType::WorkflowExecutionCanceled => true,
2333 EventType::WorkflowExecutionFailed => true,
2334 EventType::WorkflowExecutionTimedOut => true,
2335 EventType::WorkflowExecutionContinuedAsNew => true,
2336 EventType::WorkflowExecutionTerminated => true,
2337 _ => false,
2338 }
2339 }
2340
2341 pub fn is_wft_closed_event(&self) -> bool {
2342 match self.event_type() {
2343 EventType::WorkflowTaskCompleted => true,
2344 EventType::WorkflowTaskFailed => true,
2345 EventType::WorkflowTaskTimedOut => true,
2346 _ => false,
2347 }
2348 }
2349
2350 pub fn is_ignorable(&self) -> bool {
2351 if !self.worker_may_ignore {
2352 return false;
2353 }
2354 if let Some(a) = self.attributes.as_ref() {
2357 match a {
2358 Attributes::WorkflowExecutionStartedEventAttributes(_) => false,
2359 Attributes::WorkflowExecutionCompletedEventAttributes(_) => false,
2360 Attributes::WorkflowExecutionFailedEventAttributes(_) => false,
2361 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => false,
2362 Attributes::WorkflowTaskScheduledEventAttributes(_) => false,
2363 Attributes::WorkflowTaskStartedEventAttributes(_) => false,
2364 Attributes::WorkflowTaskCompletedEventAttributes(_) => false,
2365 Attributes::WorkflowTaskTimedOutEventAttributes(_) => false,
2366 Attributes::WorkflowTaskFailedEventAttributes(_) => false,
2367 Attributes::ActivityTaskScheduledEventAttributes(_) => false,
2368 Attributes::ActivityTaskStartedEventAttributes(_) => false,
2369 Attributes::ActivityTaskCompletedEventAttributes(_) => false,
2370 Attributes::ActivityTaskFailedEventAttributes(_) => false,
2371 Attributes::ActivityTaskTimedOutEventAttributes(_) => false,
2372 Attributes::TimerStartedEventAttributes(_) => false,
2373 Attributes::TimerFiredEventAttributes(_) => false,
2374 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => false,
2375 Attributes::ActivityTaskCanceledEventAttributes(_) => false,
2376 Attributes::TimerCanceledEventAttributes(_) => false,
2377 Attributes::MarkerRecordedEventAttributes(_) => false,
2378 Attributes::WorkflowExecutionSignaledEventAttributes(_) => false,
2379 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => false,
2380 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => false,
2381 Attributes::WorkflowExecutionCanceledEventAttributes(_) => false,
2382 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2383 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => false,
2384 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => false,
2385 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => false,
2386 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => false,
2387 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => false,
2388 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => false,
2389 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => false,
2390 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => false,
2391 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => false,
2392 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => false,
2393 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => false,
2394 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2395 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => false,
2396 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => false,
2397 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => false,
2398 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => false,
2399 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => false,
2400 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => false,
2401 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => false,
2402 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => false,
2403 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => false,
2404 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => false,
2405 Attributes::NexusOperationScheduledEventAttributes(_) => false,
2406 Attributes::NexusOperationStartedEventAttributes(_) => false,
2407 Attributes::NexusOperationCompletedEventAttributes(_) => false,
2408 Attributes::NexusOperationFailedEventAttributes(_) => false,
2409 Attributes::NexusOperationCanceledEventAttributes(_) => false,
2410 Attributes::NexusOperationTimedOutEventAttributes(_) => false,
2411 Attributes::NexusOperationCancelRequestedEventAttributes(_) => false,
2412 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => true,
2414 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => false,
2415 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => false,
2416 Attributes::WorkflowExecutionPausedEventAttributes(_) => true,
2418 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => true,
2420 Attributes::WorkflowExecutionTimeSkippingTransitionedEventAttributes(_) => true,
2422 }
2423 } else {
2424 false
2425 }
2426 }
2427 }
2428
2429 impl Display for HistoryEvent {
2430 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2431 write!(
2432 f,
2433 "HistoryEvent(id: {}, {:?})",
2434 self.event_id,
2435 EventType::try_from(self.event_type).unwrap_or_default()
2436 )
2437 }
2438 }
2439
2440 impl Attributes {
2441 pub fn event_type(&self) -> EventType {
2442 match self {
2444 Attributes::WorkflowExecutionStartedEventAttributes(_) => { EventType::WorkflowExecutionStarted }
2445 Attributes::WorkflowExecutionCompletedEventAttributes(_) => { EventType::WorkflowExecutionCompleted }
2446 Attributes::WorkflowExecutionFailedEventAttributes(_) => { EventType::WorkflowExecutionFailed }
2447 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => { EventType::WorkflowExecutionTimedOut }
2448 Attributes::WorkflowTaskScheduledEventAttributes(_) => { EventType::WorkflowTaskScheduled }
2449 Attributes::WorkflowTaskStartedEventAttributes(_) => { EventType::WorkflowTaskStarted }
2450 Attributes::WorkflowTaskCompletedEventAttributes(_) => { EventType::WorkflowTaskCompleted }
2451 Attributes::WorkflowTaskTimedOutEventAttributes(_) => { EventType::WorkflowTaskTimedOut }
2452 Attributes::WorkflowTaskFailedEventAttributes(_) => { EventType::WorkflowTaskFailed }
2453 Attributes::ActivityTaskScheduledEventAttributes(_) => { EventType::ActivityTaskScheduled }
2454 Attributes::ActivityTaskStartedEventAttributes(_) => { EventType::ActivityTaskStarted }
2455 Attributes::ActivityTaskCompletedEventAttributes(_) => { EventType::ActivityTaskCompleted }
2456 Attributes::ActivityTaskFailedEventAttributes(_) => { EventType::ActivityTaskFailed }
2457 Attributes::ActivityTaskTimedOutEventAttributes(_) => { EventType::ActivityTaskTimedOut }
2458 Attributes::TimerStartedEventAttributes(_) => { EventType::TimerStarted }
2459 Attributes::TimerFiredEventAttributes(_) => { EventType::TimerFired }
2460 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => { EventType::ActivityTaskCancelRequested }
2461 Attributes::ActivityTaskCanceledEventAttributes(_) => { EventType::ActivityTaskCanceled }
2462 Attributes::TimerCanceledEventAttributes(_) => { EventType::TimerCanceled }
2463 Attributes::MarkerRecordedEventAttributes(_) => { EventType::MarkerRecorded }
2464 Attributes::WorkflowExecutionSignaledEventAttributes(_) => { EventType::WorkflowExecutionSignaled }
2465 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => { EventType::WorkflowExecutionTerminated }
2466 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::WorkflowExecutionCancelRequested }
2467 Attributes::WorkflowExecutionCanceledEventAttributes(_) => { EventType::WorkflowExecutionCanceled }
2468 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionInitiated }
2469 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionFailed }
2470 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::ExternalWorkflowExecutionCancelRequested }
2471 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => { EventType::WorkflowExecutionContinuedAsNew }
2472 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => { EventType::StartChildWorkflowExecutionInitiated }
2473 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => { EventType::StartChildWorkflowExecutionFailed }
2474 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => { EventType::ChildWorkflowExecutionStarted }
2475 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => { EventType::ChildWorkflowExecutionCompleted }
2476 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => { EventType::ChildWorkflowExecutionFailed }
2477 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => { EventType::ChildWorkflowExecutionCanceled }
2478 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => { EventType::ChildWorkflowExecutionTimedOut }
2479 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => { EventType::ChildWorkflowExecutionTerminated }
2480 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionInitiated }
2481 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionFailed }
2482 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => { EventType::ExternalWorkflowExecutionSignaled }
2483 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => { EventType::UpsertWorkflowSearchAttributes }
2484 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAdmitted }
2485 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => { EventType::WorkflowExecutionUpdateRejected }
2486 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAccepted }
2487 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => { EventType::WorkflowExecutionUpdateCompleted }
2488 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => { EventType::WorkflowPropertiesModifiedExternally }
2489 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => { EventType::ActivityPropertiesModifiedExternally }
2490 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => { EventType::WorkflowPropertiesModified }
2491 Attributes::NexusOperationScheduledEventAttributes(_) => { EventType::NexusOperationScheduled }
2492 Attributes::NexusOperationStartedEventAttributes(_) => { EventType::NexusOperationStarted }
2493 Attributes::NexusOperationCompletedEventAttributes(_) => { EventType::NexusOperationCompleted }
2494 Attributes::NexusOperationFailedEventAttributes(_) => { EventType::NexusOperationFailed }
2495 Attributes::NexusOperationCanceledEventAttributes(_) => { EventType::NexusOperationCanceled }
2496 Attributes::NexusOperationTimedOutEventAttributes(_) => { EventType::NexusOperationTimedOut }
2497 Attributes::NexusOperationCancelRequestedEventAttributes(_) => { EventType::NexusOperationCancelRequested }
2498 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => { EventType::WorkflowExecutionOptionsUpdated }
2499 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => { EventType::NexusOperationCancelRequestCompleted }
2500 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => { EventType::NexusOperationCancelRequestFailed }
2501 Attributes::WorkflowExecutionPausedEventAttributes(_) => { EventType::WorkflowExecutionPaused }
2502 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => { EventType::WorkflowExecutionUnpaused }
2503 Attributes::WorkflowExecutionTimeSkippingTransitionedEventAttributes(_) => { EventType::WorkflowExecutionTimeSkippingTransitioned }
2504 }
2505 }
2506 }
2507 }
2508 }
2509 pub mod namespace {
2510 pub mod v1 {
2511 tonic::include_proto!("temporal.api.namespace.v1");
2512 }
2513 }
2514 pub mod operatorservice {
2515 pub mod v1 {
2516 tonic::include_proto!("temporal.api.operatorservice.v1");
2517 }
2518 }
2519 pub mod protocol {
2520 pub mod v1 {
2521 use std::fmt::{Display, Formatter};
2522 tonic::include_proto!("temporal.api.protocol.v1");
2523
2524 impl Display for Message {
2525 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2526 write!(f, "ProtocolMessage({})", self.id)
2527 }
2528 }
2529 }
2530 }
2531 pub mod query {
2532 pub mod v1 {
2533 tonic::include_proto!("temporal.api.query.v1");
2534 }
2535 }
2536 pub mod replication {
2537 pub mod v1 {
2538 tonic::include_proto!("temporal.api.replication.v1");
2539 }
2540 }
2541 pub mod rules {
2542 pub mod v1 {
2543 tonic::include_proto!("temporal.api.rules.v1");
2544 }
2545 }
2546 pub mod schedule {
2547 #[allow(rustdoc::invalid_html_tags)]
2548 pub mod v1 {
2549 tonic::include_proto!("temporal.api.schedule.v1");
2550 }
2551 }
2552 pub mod sdk {
2553 pub mod v1 {
2554 tonic::include_proto!("temporal.api.sdk.v1");
2555 }
2556 }
2557 pub mod taskqueue {
2558 pub mod v1 {
2559 use crate::protos::temporal::api::enums::v1::TaskQueueKind;
2560 tonic::include_proto!("temporal.api.taskqueue.v1");
2561
2562 impl From<String> for TaskQueue {
2563 fn from(name: String) -> Self {
2564 Self {
2565 name,
2566 kind: TaskQueueKind::Normal as i32,
2567 normal_name: "".to_string(),
2568 }
2569 }
2570 }
2571 }
2572 }
2573 pub mod testservice {
2574 pub mod v1 {
2575 tonic::include_proto!("temporal.api.testservice.v1");
2576 }
2577 }
2578 pub mod update {
2579 pub mod v1 {
2580 use crate::protos::temporal::api::update::v1::outcome::Value;
2581 tonic::include_proto!("temporal.api.update.v1");
2582
2583 impl Outcome {
2584 pub fn is_success(&self) -> bool {
2585 match self.value {
2586 Some(Value::Success(_)) => true,
2587 _ => false,
2588 }
2589 }
2590 }
2591 }
2592 }
2593 pub mod version {
2594 pub mod v1 {
2595 tonic::include_proto!("temporal.api.version.v1");
2596 }
2597 }
2598 pub mod worker {
2599 pub mod v1 {
2600 tonic::include_proto!("temporal.api.worker.v1");
2601 }
2602 }
2603 pub mod workflow {
2604 pub mod v1 {
2605 tonic::include_proto!("temporal.api.workflow.v1");
2606 }
2607 }
2608 pub mod nexus {
2609 pub mod v1 {
2610 use crate::protos::{
2611 camel_case_to_screaming_snake,
2612 temporal::api::{
2613 common::{
2614 self,
2615 v1::link::{WorkflowEvent, workflow_event},
2616 },
2617 enums::v1::EventType,
2618 failure,
2619 },
2620 };
2621 use anyhow::{anyhow, bail};
2622 use prost::Name;
2623 use std::{
2624 collections::HashMap,
2625 fmt::{Display, Formatter},
2626 };
2627 use tonic::transport::Uri;
2628
2629 tonic::include_proto!("temporal.api.nexus.v1");
2630
2631 impl Display for Response {
2632 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2633 write!(f, "NexusResponse(",)?;
2634 match &self.variant {
2635 None => {}
2636 Some(v) => {
2637 write!(f, "{v}")?;
2638 }
2639 }
2640 write!(f, ")")
2641 }
2642 }
2643
2644 impl Display for response::Variant {
2645 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2646 match self {
2647 response::Variant::StartOperation(_) => {
2648 write!(f, "StartOperation")
2649 }
2650 response::Variant::CancelOperation(_) => {
2651 write!(f, "CancelOperation")
2652 }
2653 }
2654 }
2655 }
2656
2657 impl Display for HandlerError {
2658 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2659 write!(f, "HandlerError")
2660 }
2661 }
2662
2663 pub enum NexusTaskFailure {
2664 Legacy(HandlerError),
2665 Temporal(failure::v1::Failure),
2666 }
2667
2668 static SCHEME_PREFIX: &str = "temporal://";
2669
2670 pub fn workflow_event_link_from_nexus(
2672 l: &Link,
2673 ) -> Result<common::v1::Link, anyhow::Error> {
2674 if !l.url.starts_with(SCHEME_PREFIX) {
2675 bail!("Invalid scheme for nexus link: {:?}", l.url);
2676 }
2677 let no_authority_url = l.url.strip_prefix(SCHEME_PREFIX).unwrap();
2680 let uri = Uri::try_from(no_authority_url)?;
2681 let parts = uri.into_parts();
2682 let path = parts.path_and_query.ok_or_else(|| {
2683 anyhow!("Failed to parse nexus link, invalid path: {:?}", l)
2684 })?;
2685 let path_parts = path.path().split('/').collect::<Vec<_>>();
2686 if path_parts.get(1) != Some(&"namespaces") {
2687 bail!("Invalid path for nexus link: {:?}", l);
2688 }
2689 let namespace = path_parts.get(2).ok_or_else(|| {
2690 anyhow!("Failed to parse nexus link, no namespace: {:?}", l)
2691 })?;
2692 if path_parts.get(3) != Some(&"workflows") {
2693 bail!("Invalid path for nexus link, no workflows segment: {:?}", l);
2694 }
2695 let workflow_id = path_parts.get(4).ok_or_else(|| {
2696 anyhow!("Failed to parse nexus link, no workflow id: {:?}", l)
2697 })?;
2698 let run_id = path_parts
2699 .get(5)
2700 .ok_or_else(|| anyhow!("Failed to parse nexus link, no run id: {:?}", l))?;
2701 if path_parts.get(6) != Some(&"history") {
2702 bail!("Invalid path for nexus link, no history segment: {:?}", l);
2703 }
2704 let reference = if let Some(query) = path.query() {
2705 let mut eventref = workflow_event::EventReference::default();
2706 let query_parts = query.split('&').collect::<Vec<_>>();
2707 for qp in query_parts {
2708 let mut kv = qp.split('=');
2709 let key = kv.next().ok_or_else(|| {
2710 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2711 })?;
2712 let val = kv.next().ok_or_else(|| {
2713 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2714 })?;
2715 match key {
2716 "eventID" => {
2717 eventref.event_id = val.parse().map_err(|_| {
2718 anyhow!("Failed to parse nexus link event id: {:?}", l)
2719 })?;
2720 }
2721 "eventType" => {
2722 eventref.event_type = EventType::from_str_name(val)
2723 .unwrap_or_else(|| {
2724 EventType::from_str_name(
2725 &("EVENT_TYPE_".to_string()
2726 + &camel_case_to_screaming_snake(val)),
2727 )
2728 .unwrap_or_default()
2729 })
2730 .into()
2731 }
2732 _ => continue,
2733 }
2734 }
2735 Some(workflow_event::Reference::EventRef(eventref))
2736 } else {
2737 None
2738 };
2739
2740 Ok(common::v1::Link {
2741 variant: Some(common::v1::link::Variant::WorkflowEvent(WorkflowEvent {
2742 namespace: namespace.to_string(),
2743 workflow_id: workflow_id.to_string(),
2744 run_id: run_id.to_string(),
2745 reference,
2746 })),
2747 })
2748 }
2749
2750 impl TryFrom<failure::v1::Failure> for Failure {
2751 type Error = serde_json::Error;
2752
2753 fn try_from(mut f: failure::v1::Failure) -> Result<Self, Self::Error> {
2754 let message = std::mem::take(&mut f.message);
2756
2757 let details = serde_json::to_vec(&f)?;
2759
2760 Ok(Failure {
2762 message,
2763 stack_trace: f.stack_trace,
2764 metadata: HashMap::from([(
2765 "type".to_string(),
2766 failure::v1::Failure::full_name().into(),
2767 )]),
2768 details,
2769 cause: None,
2770 })
2771 }
2772 }
2773 }
2774 }
2775 pub mod workflowservice {
2776 pub mod v1 {
2777 use std::{
2778 convert::TryInto,
2779 fmt::{Display, Formatter},
2780 time::{Duration, SystemTime},
2781 };
2782
2783 tonic::include_proto!("temporal.api.workflowservice.v1");
2784
2785 macro_rules! sched_to_start_impl {
2786 ($sched_field:ident) => {
2787 pub fn sched_to_start(&self) -> Option<Duration> {
2790 if let Some((sch, st)) =
2791 self.$sched_field.clone().zip(self.started_time.clone())
2792 {
2793 if let Some(value) = elapsed_between_prost_times(sch, st) {
2794 return value;
2795 }
2796 }
2797 None
2798 }
2799 };
2800 }
2801
2802 fn elapsed_between_prost_times(
2803 from: prost_types::Timestamp,
2804 to: prost_types::Timestamp,
2805 ) -> Option<Option<Duration>> {
2806 let from: Result<SystemTime, _> = from.try_into();
2807 let to: Result<SystemTime, _> = to.try_into();
2808 if let (Ok(from), Ok(to)) = (from, to) {
2809 return Some(to.duration_since(from).ok());
2810 }
2811 None
2812 }
2813
2814 impl PollWorkflowTaskQueueResponse {
2815 sched_to_start_impl!(scheduled_time);
2816 }
2817
2818 impl Display for PollWorkflowTaskQueueResponse {
2819 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2820 let last_event = self
2821 .history
2822 .as_ref()
2823 .and_then(|h| h.events.last().map(|he| he.event_id))
2824 .unwrap_or(0);
2825 write!(
2826 f,
2827 "PollWFTQResp(run_id: {}, attempt: {}, last_event: {})",
2828 self.workflow_execution
2829 .as_ref()
2830 .map_or("", |we| we.run_id.as_str()),
2831 self.attempt,
2832 last_event
2833 )
2834 }
2835 }
2836
2837 pub struct CompactHist<'a>(pub &'a PollWorkflowTaskQueueResponse);
2839 impl Display for CompactHist<'_> {
2840 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2841 writeln!(
2842 f,
2843 "PollWorkflowTaskQueueResponse (prev_started: {}, started: {})",
2844 self.0.previous_started_event_id, self.0.started_event_id
2845 )?;
2846 if let Some(h) = self.0.history.as_ref() {
2847 for event in &h.events {
2848 writeln!(f, "{}", event)?;
2849 }
2850 }
2851 writeln!(f, "query: {:#?}", self.0.query)?;
2852 writeln!(f, "queries: {:#?}", self.0.queries)
2853 }
2854 }
2855
2856 impl PollActivityTaskQueueResponse {
2857 sched_to_start_impl!(current_attempt_scheduled_time);
2858 }
2859
2860 impl PollNexusTaskQueueResponse {
2861 pub fn sched_to_start(&self) -> Option<Duration> {
2862 if let Some((sch, st)) = self
2863 .request
2864 .as_ref()
2865 .and_then(|r| r.scheduled_time)
2866 .clone()
2867 .zip(SystemTime::now().try_into().ok())
2868 {
2869 if let Some(value) = elapsed_between_prost_times(sch, st) {
2870 return value;
2871 }
2872 }
2873 None
2874 }
2875 }
2876
2877 impl QueryWorkflowResponse {
2878 pub fn unwrap(self) -> Vec<crate::protos::temporal::api::common::v1::Payload> {
2880 self.query_result.unwrap().payloads
2881 }
2882 }
2883 }
2884 }
2885 }
2886}
2887
2888#[allow(
2889 clippy::all,
2890 missing_docs,
2891 rustdoc::broken_intra_doc_links,
2892 rustdoc::bare_urls
2893)]
2894pub mod google {
2895 pub mod rpc {
2896 tonic::include_proto!("google.rpc");
2897 }
2898}
2899
2900#[allow(
2901 clippy::all,
2902 missing_docs,
2903 rustdoc::broken_intra_doc_links,
2904 rustdoc::bare_urls
2905)]
2906pub mod grpc {
2907 pub mod health {
2908 pub mod v1 {
2909 tonic::include_proto!("grpc.health.v1");
2910 }
2911 }
2912}
2913
2914pub fn camel_case_to_screaming_snake(val: &str) -> String {
2916 let mut out = String::new();
2917 let mut last_was_upper = true;
2918 for c in val.chars() {
2919 if c.is_uppercase() {
2920 if !last_was_upper {
2921 out.push('_');
2922 }
2923 out.push(c.to_ascii_uppercase());
2924 last_was_upper = true;
2925 } else {
2926 out.push(c.to_ascii_uppercase());
2927 last_was_upper = false;
2928 }
2929 }
2930 out
2931}
2932
2933pub fn proto_ts_to_system_time(ts: &prost_types::Timestamp) -> Option<std::time::SystemTime> {
2935 std::time::SystemTime::UNIX_EPOCH
2936 .checked_add(Duration::from_secs(ts.seconds as u64) + Duration::from_nanos(ts.nanos as u64))
2937}
2938
2939#[cfg(test)]
2940mod tests {
2941 use crate::protos::{
2942 coresdk::{activity_task, activity_task::ActivityTask},
2943 temporal::api::{failure::v1::Failure, workflowservice::v1::PollActivityTaskQueueResponse},
2944 };
2945 use anyhow::anyhow;
2946
2947 #[test]
2948 fn start_from_poll_resp_standalone_activity_populates_run_id() {
2949 let resp = PollActivityTaskQueueResponse {
2950 task_token: vec![1, 2, 3],
2951 activity_run_id: "test-run-id-123".to_string(),
2952 activity_id: "my-activity".to_string(),
2953 ..Default::default()
2954 };
2955 let task = ActivityTask::start_from_poll_resp(resp);
2956 let start = match task.variant {
2957 Some(activity_task::activity_task::Variant::Start(s)) => s,
2958 _ => panic!("expected Start variant"),
2959 };
2960 assert_eq!(start.run_id, "test-run-id-123");
2961 assert!(!start.is_local);
2962 }
2963
2964 #[test]
2965 fn start_from_poll_resp_workflow_activity_has_empty_run_id() {
2966 use crate::protos::temporal::api::common::v1::WorkflowExecution;
2967 let resp = PollActivityTaskQueueResponse {
2968 task_token: vec![4, 5, 6],
2969 activity_id: "my-workflow-activity".to_string(),
2970 workflow_execution: Some(WorkflowExecution {
2971 workflow_id: "wf-123".to_string(),
2972 run_id: "wf-run-456".to_string(),
2973 }),
2974 ..Default::default()
2976 };
2977 let task = ActivityTask::start_from_poll_resp(resp);
2978 let start = match task.variant {
2979 Some(activity_task::activity_task::Variant::Start(s)) => s,
2980 _ => panic!("expected Start variant"),
2981 };
2982 assert!(start.run_id.is_empty());
2983 assert_eq!(start.workflow_execution.unwrap().run_id, "wf-run-456");
2985 }
2986
2987 #[test]
2988 fn anyhow_to_failure_conversion() {
2989 let no_causes: Failure = anyhow!("no causes").into();
2990 assert_eq!(no_causes.cause, None);
2991 assert_eq!(no_causes.message, "no causes");
2992 let orig = anyhow!("fail 1");
2993 let mid = orig.context("fail 2");
2994 let top = mid.context("fail 3");
2995 let as_fail: Failure = top.into();
2996 assert_eq!(as_fail.message, "fail 3");
2997 assert_eq!(as_fail.cause.as_ref().unwrap().message, "fail 2");
2998 assert_eq!(as_fail.cause.unwrap().cause.unwrap().message, "fail 1");
2999 }
3000}