1pub mod constants;
6pub mod utilities;
8
9#[cfg(feature = "test-utilities")]
10pub mod canned_histories;
12#[cfg(feature = "history_builders")]
13mod history_builder;
14#[cfg(feature = "history_builders")]
15mod history_info;
16mod task_token;
17#[cfg(feature = "test-utilities")]
18pub mod test_utils;
19
20use std::time::Duration;
21
22#[cfg(feature = "history_builders")]
23pub use history_builder::{
24 DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE, TestHistoryBuilder, default_act_sched,
25 default_wes_attribs,
26};
27#[cfg(feature = "history_builders")]
28pub use history_info::HistoryInfo;
29pub use task_token::TaskToken;
30
31pub static ENCODING_PAYLOAD_KEY: &str = "encoding";
33pub static JSON_ENCODING_VAL: &str = "json/plain";
35pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data";
37pub static VERSION_SEARCH_ATTR_KEY: &str = "TemporalChangeVersion";
39
40macro_rules! include_proto_with_serde {
41 ($pkg:tt) => {
42 tonic::include_proto!($pkg);
43
44 include!(concat!(env!("OUT_DIR"), concat!("/", $pkg, ".serde.rs")));
45 };
46}
47
48#[allow(
49 clippy::large_enum_variant,
50 clippy::derive_partial_eq_without_eq,
51 clippy::reserve_after_initialization
52)]
53#[allow(missing_docs)]
55pub mod coresdk {
56 tonic::include_proto!("coresdk");
59
60 use crate::protos::{
61 ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL,
62 temporal::api::{
63 common::v1::{Payload, Payloads, RetryPolicy, WorkflowExecution},
64 enums::v1::{
65 ApplicationErrorCategory, TimeoutType, VersioningBehavior, WorkflowTaskFailedCause,
66 },
67 failure::v1::{
68 ActivityFailureInfo, ApplicationFailureInfo, Failure, TimeoutFailureInfo,
69 failure::FailureInfo,
70 },
71 workflowservice::v1::PollActivityTaskQueueResponse,
72 },
73 };
74 use activity_task::ActivityTask;
75 use serde::{Deserialize, Serialize};
76 use std::{
77 collections::HashMap,
78 convert::TryFrom,
79 fmt::{Display, Formatter},
80 iter::FromIterator,
81 };
82 use workflow_activation::{WorkflowActivationJob, workflow_activation_job};
83 use workflow_commands::{WorkflowCommand, workflow_command, workflow_command::Variant};
84 use workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion};
85
86 #[allow(clippy::module_inception)]
87 pub mod activity_task {
88 use crate::protos::{coresdk::ActivityTaskCompletion, task_token::format_task_token};
89 use std::fmt::{Display, Formatter};
90 tonic::include_proto!("coresdk.activity_task");
91
92 impl ActivityTask {
93 pub fn cancel_from_ids(
94 task_token: Vec<u8>,
95 reason: ActivityCancelReason,
96 details: ActivityCancellationDetails,
97 ) -> Self {
98 Self {
99 task_token,
100 variant: Some(activity_task::Variant::Cancel(Cancel {
101 reason: reason as i32,
102 details: Some(details),
103 })),
104 }
105 }
106
107 pub fn is_timeout(&self) -> bool {
109 match &self.variant {
110 Some(activity_task::Variant::Cancel(Cancel { reason, details })) => {
111 *reason == ActivityCancelReason::TimedOut as i32
112 || details.as_ref().is_some_and(|d| d.is_timed_out)
113 }
114 _ => false,
115 }
116 }
117
118 pub fn primary_reason_to_cancellation_details(
119 reason: ActivityCancelReason,
120 ) -> ActivityCancellationDetails {
121 ActivityCancellationDetails {
122 is_not_found: reason == ActivityCancelReason::NotFound,
123 is_cancelled: reason == ActivityCancelReason::Cancelled,
124 is_paused: reason == ActivityCancelReason::Paused,
125 is_timed_out: reason == ActivityCancelReason::TimedOut,
126 is_worker_shutdown: reason == ActivityCancelReason::WorkerShutdown,
127 is_reset: reason == ActivityCancelReason::Reset,
128 }
129 }
130 }
131
132 impl Display for ActivityTaskCompletion {
133 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134 write!(
135 f,
136 "ActivityTaskCompletion(token: {}",
137 format_task_token(&self.task_token),
138 )?;
139 if let Some(r) = self.result.as_ref().and_then(|r| r.status.as_ref()) {
140 write!(f, ", {r}")?;
141 } else {
142 write!(f, ", missing result")?;
143 }
144 write!(f, ")")
145 }
146 }
147 }
148 #[allow(clippy::module_inception)]
149 pub mod activity_result {
150 tonic::include_proto!("coresdk.activity_result");
151 use super::super::temporal::api::{
152 common::v1::Payload,
153 failure::v1::{CanceledFailureInfo, Failure as APIFailure, failure},
154 };
155 use crate::protos::{
156 coresdk::activity_result::activity_resolution::Status,
157 temporal::api::enums::v1::TimeoutType,
158 };
159 use activity_execution_result as aer;
160 use anyhow::anyhow;
161 use std::fmt::{Display, Formatter};
162
163 impl ActivityExecutionResult {
164 pub const fn ok(result: Payload) -> Self {
165 Self {
166 status: Some(aer::Status::Completed(Success {
167 result: Some(result),
168 })),
169 }
170 }
171
172 pub fn fail(fail: APIFailure) -> Self {
173 Self {
174 status: Some(aer::Status::Failed(Failure {
175 failure: Some(fail),
176 })),
177 }
178 }
179
180 pub fn cancel_from_details(payload: Option<Payload>) -> Self {
181 Self {
182 status: Some(aer::Status::Cancelled(Cancellation::from_details(payload))),
183 }
184 }
185
186 pub const fn will_complete_async() -> Self {
187 Self {
188 status: Some(aer::Status::WillCompleteAsync(WillCompleteAsync {})),
189 }
190 }
191
192 pub fn is_cancelled(&self) -> bool {
193 matches!(self.status, Some(aer::Status::Cancelled(_)))
194 }
195 }
196
197 impl Display for aer::Status {
198 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199 write!(f, "ActivityExecutionResult(")?;
200 match self {
201 aer::Status::Completed(v) => {
202 write!(f, "{v})")
203 }
204 aer::Status::Failed(v) => {
205 write!(f, "{v})")
206 }
207 aer::Status::Cancelled(v) => {
208 write!(f, "{v})")
209 }
210 aer::Status::WillCompleteAsync(_) => {
211 write!(f, "Will complete async)")
212 }
213 }
214 }
215 }
216
217 impl Display for Success {
218 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
219 write!(f, "Success(")?;
220 if let Some(ref v) = self.result {
221 write!(f, "{v}")?;
222 }
223 write!(f, ")")
224 }
225 }
226
227 impl Display for Failure {
228 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
229 write!(f, "Failure(")?;
230 if let Some(ref v) = self.failure {
231 write!(f, "{v}")?;
232 }
233 write!(f, ")")
234 }
235 }
236
237 impl Display for Cancellation {
238 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
239 write!(f, "Cancellation(")?;
240 if let Some(ref v) = self.failure {
241 write!(f, "{v}")?;
242 }
243 write!(f, ")")
244 }
245 }
246
247 impl From<Result<Payload, APIFailure>> for ActivityExecutionResult {
248 fn from(r: Result<Payload, APIFailure>) -> Self {
249 Self {
250 status: match r {
251 Ok(p) => Some(aer::Status::Completed(Success { result: Some(p) })),
252 Err(f) => Some(aer::Status::Failed(Failure { failure: Some(f) })),
253 },
254 }
255 }
256 }
257
258 impl ActivityResolution {
259 pub fn success_payload_or_error(self) -> Result<Option<Payload>, anyhow::Error> {
262 let Some(status) = self.status else {
263 return Err(anyhow!("Activity completed without a status"));
264 };
265
266 match status {
267 activity_resolution::Status::Completed(success) => Ok(success.result),
268 e => Err(anyhow!("Activity was not successful: {e:?}")),
269 }
270 }
271
272 pub fn unwrap_ok_payload(self) -> Payload {
273 self.success_payload_or_error().unwrap().unwrap()
274 }
275
276 pub fn completed_ok(&self) -> bool {
277 matches!(self.status, Some(activity_resolution::Status::Completed(_)))
278 }
279
280 pub fn failed(&self) -> bool {
281 matches!(self.status, Some(activity_resolution::Status::Failed(_)))
282 }
283
284 pub fn timed_out(&self) -> Option<TimeoutType> {
285 match self.status {
286 Some(activity_resolution::Status::Failed(Failure {
287 failure: Some(ref f),
288 })) => f.is_timeout(),
289 _ => None,
290 }
291 }
292
293 pub fn cancelled(&self) -> bool {
294 matches!(self.status, Some(activity_resolution::Status::Cancelled(_)))
295 }
296
297 pub fn unwrap_failure(self) -> APIFailure {
300 match self.status.unwrap() {
301 Status::Failed(f) => f.failure.unwrap(),
302 Status::Cancelled(c) => c.failure.unwrap(),
303 _ => panic!("Actvity did not fail"),
304 }
305 }
306 }
307
308 impl Cancellation {
309 pub fn from_details(details: Option<Payload>) -> Self {
312 Cancellation {
313 failure: Some(APIFailure {
314 message: "Activity cancelled".to_string(),
315 failure_info: Some(failure::FailureInfo::CanceledFailureInfo(
316 CanceledFailureInfo {
317 details: details.map(Into::into),
318 },
319 )),
320 ..Default::default()
321 }),
322 }
323 }
324 }
325 }
326
327 pub mod common {
328 tonic::include_proto!("coresdk.common");
329 use super::external_data::LocalActivityMarkerData;
330 use crate::protos::{
331 PATCHED_MARKER_DETAILS_KEY,
332 coresdk::{
333 AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt,
334 external_data::PatchedMarkerData,
335 },
336 temporal::api::common::v1::{Payload, Payloads},
337 };
338 use std::collections::HashMap;
339
340 pub fn build_has_change_marker_details(
341 patch_id: impl Into<String>,
342 deprecated: bool,
343 ) -> anyhow::Result<HashMap<String, Payloads>> {
344 let mut hm = HashMap::new();
345 let encoded = PatchedMarkerData {
346 id: patch_id.into(),
347 deprecated,
348 }
349 .as_json_payload()?;
350 hm.insert(PATCHED_MARKER_DETAILS_KEY.to_string(), encoded.into());
351 Ok(hm)
352 }
353
354 pub fn decode_change_marker_details(
355 details: &HashMap<String, Payloads>,
356 ) -> Option<(String, bool)> {
357 if let Some(cd) = details.get(PATCHED_MARKER_DETAILS_KEY) {
360 let decoded = PatchedMarkerData::from_json_payload(cd.payloads.first()?).ok()?;
361 return Some((decoded.id, decoded.deprecated));
362 }
363
364 let id_entry = details.get("patch_id")?.payloads.first()?;
365 let deprecated_entry = details.get("deprecated")?.payloads.first()?;
366 let name = std::str::from_utf8(&id_entry.data).ok()?;
367 let deprecated = *deprecated_entry.data.first()? != 0;
368 Some((name.to_string(), deprecated))
369 }
370
371 pub fn build_local_activity_marker_details(
372 metadata: LocalActivityMarkerData,
373 result: Option<Payload>,
374 ) -> HashMap<String, Payloads> {
375 let mut hm = HashMap::new();
376 if let Some(jsonified) = metadata.as_json_payload().into_payloads() {
379 hm.insert("data".to_string(), jsonified);
380 }
381 if let Some(res) = result {
382 hm.insert("result".to_string(), res.into());
383 }
384 hm
385 }
386
387 pub fn extract_local_activity_marker_data(
390 details: &HashMap<String, Payloads>,
391 ) -> Option<LocalActivityMarkerData> {
392 details
393 .get("data")
394 .and_then(|p| p.payloads.first())
395 .and_then(|p| std::str::from_utf8(&p.data).ok())
396 .and_then(|s| serde_json::from_str(s).ok())
397 }
398
399 pub fn extract_local_activity_marker_details(
403 details: &mut HashMap<String, Payloads>,
404 ) -> (Option<LocalActivityMarkerData>, Option<Payload>) {
405 let data = extract_local_activity_marker_data(details);
406 let result = details.remove("result").and_then(|mut p| p.payloads.pop());
407 (data, result)
408 }
409 }
410
411 pub mod external_data {
412 use prost_types::{Duration, Timestamp};
413 use serde::{Deserialize, Deserializer, Serialize, Serializer};
414 tonic::include_proto!("coresdk.external_data");
415
416 #[derive(Serialize, Deserialize)]
420 #[serde(remote = "Timestamp")]
421 struct TimestampDef {
422 seconds: i64,
423 nanos: i32,
424 }
425 mod opt_timestamp {
426 use super::*;
427
428 pub(super) fn serialize<S>(
429 value: &Option<Timestamp>,
430 serializer: S,
431 ) -> Result<S::Ok, S::Error>
432 where
433 S: Serializer,
434 {
435 #[derive(Serialize)]
436 struct Helper<'a>(#[serde(with = "TimestampDef")] &'a Timestamp);
437
438 value.as_ref().map(Helper).serialize(serializer)
439 }
440
441 pub(super) fn deserialize<'de, D>(
442 deserializer: D,
443 ) -> Result<Option<Timestamp>, D::Error>
444 where
445 D: Deserializer<'de>,
446 {
447 #[derive(Deserialize)]
448 struct Helper(#[serde(with = "TimestampDef")] Timestamp);
449
450 let helper = Option::deserialize(deserializer)?;
451 Ok(helper.map(|Helper(external)| external))
452 }
453 }
454
455 #[derive(Serialize, Deserialize)]
457 #[serde(remote = "Duration")]
458 struct DurationDef {
459 seconds: i64,
460 nanos: i32,
461 }
462 mod opt_duration {
463 use super::*;
464
465 pub(super) fn serialize<S>(
466 value: &Option<Duration>,
467 serializer: S,
468 ) -> Result<S::Ok, S::Error>
469 where
470 S: Serializer,
471 {
472 #[derive(Serialize)]
473 struct Helper<'a>(#[serde(with = "DurationDef")] &'a Duration);
474
475 value.as_ref().map(Helper).serialize(serializer)
476 }
477
478 pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
479 where
480 D: Deserializer<'de>,
481 {
482 #[derive(Deserialize)]
483 struct Helper(#[serde(with = "DurationDef")] Duration);
484
485 let helper = Option::deserialize(deserializer)?;
486 Ok(helper.map(|Helper(external)| external))
487 }
488 }
489 }
490
491 pub mod workflow_activation {
492 use crate::protos::{
493 coresdk::{
494 FromPayloadsExt,
495 activity_result::{ActivityResolution, activity_resolution},
496 common::NamespacedWorkflowExecution,
497 fix_retry_policy,
498 workflow_activation::remove_from_cache::EvictionReason,
499 },
500 temporal::api::{
501 enums::v1::WorkflowTaskFailedCause,
502 history::v1::{
503 WorkflowExecutionCancelRequestedEventAttributes,
504 WorkflowExecutionSignaledEventAttributes,
505 WorkflowExecutionStartedEventAttributes,
506 },
507 query::v1::WorkflowQuery,
508 },
509 };
510 use prost_types::Timestamp;
511 use std::fmt::{Display, Formatter};
512
513 tonic::include_proto!("coresdk.workflow_activation");
514
515 pub fn create_evict_activation(
516 run_id: String,
517 message: String,
518 reason: EvictionReason,
519 ) -> WorkflowActivation {
520 WorkflowActivation {
521 timestamp: None,
522 run_id,
523 is_replaying: false,
524 history_length: 0,
525 jobs: vec![WorkflowActivationJob::from(
526 workflow_activation_job::Variant::RemoveFromCache(RemoveFromCache {
527 message,
528 reason: reason as i32,
529 }),
530 )],
531 available_internal_flags: vec![],
532 history_size_bytes: 0,
533 continue_as_new_suggested: false,
534 deployment_version_for_current_task: None,
535 last_sdk_version: String::new(),
536 suggest_continue_as_new_reasons: vec![],
537 }
538 }
539
540 pub fn query_to_job(id: String, q: WorkflowQuery) -> QueryWorkflow {
541 QueryWorkflow {
542 query_id: id,
543 query_type: q.query_type,
544 arguments: Vec::from_payloads(q.query_args),
545 headers: q.header.map(|h| h.into()).unwrap_or_default(),
546 }
547 }
548
549 impl WorkflowActivation {
550 pub fn is_only_eviction(&self) -> bool {
552 matches!(
553 self.jobs.as_slice(),
554 [WorkflowActivationJob {
555 variant: Some(workflow_activation_job::Variant::RemoveFromCache(_))
556 }]
557 )
558 }
559
560 pub fn eviction_reason(&self) -> Option<EvictionReason> {
562 self.jobs.iter().find_map(|j| {
563 if let Some(workflow_activation_job::Variant::RemoveFromCache(ref rj)) =
564 j.variant
565 {
566 EvictionReason::try_from(rj.reason).ok()
567 } else {
568 None
569 }
570 })
571 }
572 }
573
574 impl workflow_activation_job::Variant {
575 pub fn is_local_activity_resolution(&self) -> bool {
576 matches!(self, workflow_activation_job::Variant::ResolveActivity(ra) if ra.is_local)
577 }
578 }
579
580 impl Display for EvictionReason {
581 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582 write!(f, "{self:?}")
583 }
584 }
585
586 impl From<EvictionReason> for WorkflowTaskFailedCause {
587 fn from(value: EvictionReason) -> Self {
588 match value {
589 EvictionReason::Nondeterminism => {
590 WorkflowTaskFailedCause::NonDeterministicError
591 }
592 _ => WorkflowTaskFailedCause::Unspecified,
593 }
594 }
595 }
596
597 impl Display for WorkflowActivation {
598 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
599 write!(f, "WorkflowActivation(")?;
600 write!(f, "run_id: {}, ", self.run_id)?;
601 write!(f, "is_replaying: {}, ", self.is_replaying)?;
602 write!(
603 f,
604 "jobs: {})",
605 self.jobs
606 .iter()
607 .map(ToString::to_string)
608 .collect::<Vec<_>>()
609 .as_slice()
610 .join(", ")
611 )
612 }
613 }
614
615 impl Display for WorkflowActivationJob {
616 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
617 match &self.variant {
618 None => write!(f, "empty"),
619 Some(v) => write!(f, "{v}"),
620 }
621 }
622 }
623
624 impl Display for workflow_activation_job::Variant {
625 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
626 match self {
627 workflow_activation_job::Variant::InitializeWorkflow(_) => {
628 write!(f, "InitializeWorkflow")
629 }
630 workflow_activation_job::Variant::FireTimer(t) => {
631 write!(f, "FireTimer({})", t.seq)
632 }
633 workflow_activation_job::Variant::UpdateRandomSeed(_) => {
634 write!(f, "UpdateRandomSeed")
635 }
636 workflow_activation_job::Variant::QueryWorkflow(_) => {
637 write!(f, "QueryWorkflow")
638 }
639 workflow_activation_job::Variant::CancelWorkflow(_) => {
640 write!(f, "CancelWorkflow")
641 }
642 workflow_activation_job::Variant::SignalWorkflow(_) => {
643 write!(f, "SignalWorkflow")
644 }
645 workflow_activation_job::Variant::ResolveActivity(r) => {
646 write!(
647 f,
648 "ResolveActivity({}, {})",
649 r.seq,
650 r.result
651 .as_ref()
652 .unwrap_or(&ActivityResolution { status: None })
653 )
654 }
655 workflow_activation_job::Variant::NotifyHasPatch(_) => {
656 write!(f, "NotifyHasPatch")
657 }
658 workflow_activation_job::Variant::ResolveChildWorkflowExecutionStart(_) => {
659 write!(f, "ResolveChildWorkflowExecutionStart")
660 }
661 workflow_activation_job::Variant::ResolveChildWorkflowExecution(_) => {
662 write!(f, "ResolveChildWorkflowExecution")
663 }
664 workflow_activation_job::Variant::ResolveSignalExternalWorkflow(_) => {
665 write!(f, "ResolveSignalExternalWorkflow")
666 }
667 workflow_activation_job::Variant::RemoveFromCache(_) => {
668 write!(f, "RemoveFromCache")
669 }
670 workflow_activation_job::Variant::ResolveRequestCancelExternalWorkflow(_) => {
671 write!(f, "ResolveRequestCancelExternalWorkflow")
672 }
673 workflow_activation_job::Variant::DoUpdate(u) => {
674 write!(f, "DoUpdate({})", u.id)
675 }
676 workflow_activation_job::Variant::ResolveNexusOperationStart(_) => {
677 write!(f, "ResolveNexusOperationStart")
678 }
679 workflow_activation_job::Variant::ResolveNexusOperation(_) => {
680 write!(f, "ResolveNexusOperation")
681 }
682 }
683 }
684 }
685
686 impl Display for ActivityResolution {
687 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
688 match self.status {
689 None => {
690 write!(f, "None")
691 }
692 Some(activity_resolution::Status::Failed(_)) => {
693 write!(f, "Failed")
694 }
695 Some(activity_resolution::Status::Completed(_)) => {
696 write!(f, "Completed")
697 }
698 Some(activity_resolution::Status::Cancelled(_)) => {
699 write!(f, "Cancelled")
700 }
701 Some(activity_resolution::Status::Backoff(_)) => {
702 write!(f, "Backoff")
703 }
704 }
705 }
706 }
707
708 impl Display for QueryWorkflow {
709 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
710 write!(
711 f,
712 "QueryWorkflow(id: {}, type: {})",
713 self.query_id, self.query_type
714 )
715 }
716 }
717
718 impl From<WorkflowExecutionSignaledEventAttributes> for SignalWorkflow {
719 fn from(a: WorkflowExecutionSignaledEventAttributes) -> Self {
720 Self {
721 signal_name: a.signal_name,
722 input: Vec::from_payloads(a.input),
723 identity: a.identity,
724 headers: a.header.map(Into::into).unwrap_or_default(),
725 }
726 }
727 }
728
729 impl From<WorkflowExecutionCancelRequestedEventAttributes> for CancelWorkflow {
730 fn from(a: WorkflowExecutionCancelRequestedEventAttributes) -> Self {
731 Self { reason: a.cause }
732 }
733 }
734
735 pub fn start_workflow_from_attribs(
737 attrs: WorkflowExecutionStartedEventAttributes,
738 workflow_id: String,
739 randomness_seed: u64,
740 start_time: Timestamp,
741 ) -> InitializeWorkflow {
742 InitializeWorkflow {
743 workflow_type: attrs.workflow_type.map(|wt| wt.name).unwrap_or_default(),
744 workflow_id,
745 arguments: Vec::from_payloads(attrs.input),
746 randomness_seed,
747 headers: attrs.header.unwrap_or_default().fields,
748 identity: attrs.identity,
749 parent_workflow_info: attrs.parent_workflow_execution.map(|pe| {
750 NamespacedWorkflowExecution {
751 namespace: attrs.parent_workflow_namespace,
752 run_id: pe.run_id,
753 workflow_id: pe.workflow_id,
754 }
755 }),
756 workflow_execution_timeout: attrs.workflow_execution_timeout,
757 workflow_run_timeout: attrs.workflow_run_timeout,
758 workflow_task_timeout: attrs.workflow_task_timeout,
759 continued_from_execution_run_id: attrs.continued_execution_run_id,
760 continued_initiator: attrs.initiator,
761 continued_failure: attrs.continued_failure,
762 last_completion_result: attrs.last_completion_result,
763 first_execution_run_id: attrs.first_execution_run_id,
764 retry_policy: attrs.retry_policy.map(fix_retry_policy),
765 attempt: attrs.attempt,
766 cron_schedule: attrs.cron_schedule,
767 workflow_execution_expiration_time: attrs.workflow_execution_expiration_time,
768 cron_schedule_to_schedule_interval: attrs.first_workflow_task_backoff,
769 memo: attrs.memo,
770 search_attributes: attrs.search_attributes,
771 start_time: Some(start_time),
772 root_workflow: attrs.root_workflow_execution,
773 priority: attrs.priority,
774 }
775 }
776 }
777
778 pub mod workflow_completion {
779 use crate::protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure};
780 tonic::include_proto!("coresdk.workflow_completion");
781
782 impl workflow_activation_completion::Status {
783 pub const fn is_success(&self) -> bool {
784 match &self {
785 Self::Successful(_) => true,
786 Self::Failed(_) => false,
787 }
788 }
789 }
790
791 impl From<failure::v1::Failure> for Failure {
792 fn from(f: failure::v1::Failure) -> Self {
793 Failure {
794 failure: Some(f),
795 force_cause: WorkflowTaskFailedCause::Unspecified as i32,
796 }
797 }
798 }
799 }
800
801 pub mod child_workflow {
802 tonic::include_proto!("coresdk.child_workflow");
803 }
804
805 pub mod nexus {
806 use crate::protos::temporal::api::workflowservice::v1::PollNexusTaskQueueResponse;
807 use std::fmt::{Display, Formatter};
808
809 tonic::include_proto!("coresdk.nexus");
810
811 impl NexusTask {
812 pub fn unwrap_task(self) -> PollNexusTaskQueueResponse {
814 if let Some(nexus_task::Variant::Task(t)) = self.variant {
815 return t;
816 }
817 panic!("Nexus task did not contain a server task");
818 }
819
820 pub fn task_token(&self) -> &[u8] {
822 match &self.variant {
823 Some(nexus_task::Variant::Task(t)) => t.task_token.as_slice(),
824 Some(nexus_task::Variant::CancelTask(c)) => c.task_token.as_slice(),
825 None => panic!("Nexus task did not contain a task token"),
826 }
827 }
828 }
829
830 impl Display for nexus_task_completion::Status {
831 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
832 write!(f, "NexusTaskCompletion(")?;
833 match self {
834 nexus_task_completion::Status::Completed(c) => {
835 write!(f, "{c}")
836 }
837 nexus_task_completion::Status::AckCancel(_) => {
838 write!(f, "AckCancel")
839 }
840 #[allow(deprecated)]
841 nexus_task_completion::Status::Error(error) => {
842 write!(f, "Error({error:?})")
843 }
844 nexus_task_completion::Status::Failure(failure) => {
845 write!(f, "{failure}")
846 }
847 }?;
848 write!(f, ")")
849 }
850 }
851
852 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
853 pub enum NexusOperationErrorState {
854 Failed,
855 Canceled,
856 }
857
858 impl Display for NexusOperationErrorState {
859 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
860 match self {
861 Self::Failed => write!(f, "failed"),
862 Self::Canceled => write!(f, "canceled"),
863 }
864 }
865 }
866 }
867
868 pub mod workflow_commands {
869 tonic::include_proto!("coresdk.workflow_commands");
870
871 use crate::protos::temporal::api::{common::v1::Payloads, enums::v1::QueryResultType};
872 use std::fmt::{Display, Formatter};
873
874 impl Display for WorkflowCommand {
875 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
876 match &self.variant {
877 None => write!(f, "Empty"),
878 Some(v) => write!(f, "{v}"),
879 }
880 }
881 }
882
883 impl Display for StartTimer {
884 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
885 write!(f, "StartTimer({})", self.seq)
886 }
887 }
888
889 impl Display for ScheduleActivity {
890 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
891 write!(f, "ScheduleActivity({}, {})", self.seq, self.activity_type)
892 }
893 }
894
895 impl Display for ScheduleLocalActivity {
896 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
897 write!(
898 f,
899 "ScheduleLocalActivity({}, {})",
900 self.seq, self.activity_type
901 )
902 }
903 }
904
905 impl Display for QueryResult {
906 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
907 write!(f, "RespondToQuery({})", self.query_id)
908 }
909 }
910
911 impl Display for RequestCancelActivity {
912 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
913 write!(f, "RequestCancelActivity({})", self.seq)
914 }
915 }
916
917 impl Display for RequestCancelLocalActivity {
918 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
919 write!(f, "RequestCancelLocalActivity({})", self.seq)
920 }
921 }
922
923 impl Display for CancelTimer {
924 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
925 write!(f, "CancelTimer({})", self.seq)
926 }
927 }
928
929 impl Display for CompleteWorkflowExecution {
930 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
931 write!(f, "CompleteWorkflowExecution")
932 }
933 }
934
935 impl Display for FailWorkflowExecution {
936 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
937 write!(f, "FailWorkflowExecution")
938 }
939 }
940
941 impl Display for ContinueAsNewWorkflowExecution {
942 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
943 write!(f, "ContinueAsNewWorkflowExecution")
944 }
945 }
946
947 impl Display for CancelWorkflowExecution {
948 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
949 write!(f, "CancelWorkflowExecution")
950 }
951 }
952
953 impl Display for SetPatchMarker {
954 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
955 write!(f, "SetPatchMarker({})", self.patch_id)
956 }
957 }
958
959 impl Display for StartChildWorkflowExecution {
960 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
961 write!(
962 f,
963 "StartChildWorkflowExecution({}, {})",
964 self.seq, self.workflow_type
965 )
966 }
967 }
968
969 impl Display for RequestCancelExternalWorkflowExecution {
970 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
971 write!(f, "RequestCancelExternalWorkflowExecution({})", self.seq)
972 }
973 }
974
975 impl Display for UpsertWorkflowSearchAttributes {
976 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
977 let keys: Vec<_> = self
978 .search_attributes
979 .as_ref()
980 .map(|sa| sa.indexed_fields.keys().collect())
981 .unwrap_or_default();
982 write!(f, "UpsertWorkflowSearchAttributes({:?})", keys)
983 }
984 }
985
986 impl Display for SignalExternalWorkflowExecution {
987 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
988 write!(f, "SignalExternalWorkflowExecution({})", self.seq)
989 }
990 }
991
992 impl Display for CancelSignalWorkflow {
993 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
994 write!(f, "CancelSignalWorkflow({})", self.seq)
995 }
996 }
997
998 impl Display for CancelChildWorkflowExecution {
999 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1000 write!(
1001 f,
1002 "CancelChildWorkflowExecution({})",
1003 self.child_workflow_seq
1004 )
1005 }
1006 }
1007
1008 impl Display for ModifyWorkflowProperties {
1009 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1010 write!(
1011 f,
1012 "ModifyWorkflowProperties(upserted memo keys: {:?})",
1013 self.upserted_memo.as_ref().map(|m| m.fields.keys())
1014 )
1015 }
1016 }
1017
1018 impl Display for UpdateResponse {
1019 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1020 write!(
1021 f,
1022 "UpdateResponse(protocol_instance_id: {}, response: {:?})",
1023 self.protocol_instance_id, self.response
1024 )
1025 }
1026 }
1027
1028 impl Display for ScheduleNexusOperation {
1029 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1030 write!(f, "ScheduleNexusOperation({})", self.seq)
1031 }
1032 }
1033
1034 impl Display for RequestCancelNexusOperation {
1035 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1036 write!(f, "RequestCancelNexusOperation({})", self.seq)
1037 }
1038 }
1039
1040 impl QueryResult {
1041 pub fn into_components(self) -> (String, QueryResultType, Option<Payloads>, String) {
1043 match self {
1044 QueryResult {
1045 variant: Some(query_result::Variant::Succeeded(qs)),
1046 query_id,
1047 } => (
1048 query_id,
1049 QueryResultType::Answered,
1050 qs.response.map(Into::into),
1051 "".to_string(),
1052 ),
1053 QueryResult {
1054 variant: Some(query_result::Variant::Failed(err)),
1055 query_id,
1056 } => (query_id, QueryResultType::Failed, None, err.message),
1057 QueryResult {
1058 variant: None,
1059 query_id,
1060 } => (
1061 query_id,
1062 QueryResultType::Failed,
1063 None,
1064 "Query response was empty".to_string(),
1065 ),
1066 }
1067 }
1068 }
1069 }
1070
1071 pub type HistoryEventId = i64;
1072
1073 impl From<workflow_activation_job::Variant> for WorkflowActivationJob {
1074 fn from(a: workflow_activation_job::Variant) -> Self {
1075 Self { variant: Some(a) }
1076 }
1077 }
1078
1079 impl From<Vec<WorkflowCommand>> for workflow_completion::Success {
1080 fn from(v: Vec<WorkflowCommand>) -> Self {
1081 Self {
1082 commands: v,
1083 used_internal_flags: vec![],
1084 versioning_behavior: VersioningBehavior::Unspecified.into(),
1085 }
1086 }
1087 }
1088
1089 impl From<workflow_command::Variant> for WorkflowCommand {
1090 fn from(v: workflow_command::Variant) -> Self {
1091 Self {
1092 variant: Some(v),
1093 user_metadata: None,
1094 }
1095 }
1096 }
1097
1098 impl workflow_completion::Success {
1099 pub fn from_variants(cmds: Vec<Variant>) -> Self {
1100 let cmds: Vec<_> = cmds.into_iter().map(|c| c.into()).collect();
1101 cmds.into()
1102 }
1103 }
1104
1105 impl WorkflowActivationCompletion {
1106 pub fn empty(run_id: impl Into<String>) -> Self {
1108 let success = workflow_completion::Success::from_variants(vec![]);
1109 Self {
1110 run_id: run_id.into(),
1111 status: Some(workflow_activation_completion::Status::Successful(success)),
1112 }
1113 }
1114
1115 pub fn from_cmds(run_id: impl Into<String>, cmds: Vec<workflow_command::Variant>) -> Self {
1117 let success = workflow_completion::Success::from_variants(cmds);
1118 Self {
1119 run_id: run_id.into(),
1120 status: Some(workflow_activation_completion::Status::Successful(success)),
1121 }
1122 }
1123
1124 pub fn from_cmd(run_id: impl Into<String>, cmd: workflow_command::Variant) -> Self {
1126 let success = workflow_completion::Success::from_variants(vec![cmd]);
1127 Self {
1128 run_id: run_id.into(),
1129 status: Some(workflow_activation_completion::Status::Successful(success)),
1130 }
1131 }
1132
1133 pub fn fail(
1134 run_id: impl Into<String>,
1135 failure: Failure,
1136 cause: Option<WorkflowTaskFailedCause>,
1137 ) -> Self {
1138 Self {
1139 run_id: run_id.into(),
1140 status: Some(workflow_activation_completion::Status::Failed(
1141 workflow_completion::Failure {
1142 failure: Some(failure),
1143 force_cause: cause.unwrap_or(WorkflowTaskFailedCause::Unspecified) as i32,
1144 },
1145 )),
1146 }
1147 }
1148
1149 pub fn has_execution_ending(&self) -> bool {
1152 self.has_complete_workflow_execution()
1153 || self.has_fail_execution()
1154 || self.has_continue_as_new()
1155 || self.has_cancel_workflow_execution()
1156 }
1157
1158 pub fn has_fail_execution(&self) -> bool {
1160 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1161 return s.commands.iter().any(|wfc| {
1162 matches!(
1163 wfc,
1164 WorkflowCommand {
1165 variant: Some(workflow_command::Variant::FailWorkflowExecution(_)),
1166 ..
1167 }
1168 )
1169 });
1170 }
1171 false
1172 }
1173
1174 pub fn has_cancel_workflow_execution(&self) -> bool {
1176 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1177 return s.commands.iter().any(|wfc| {
1178 matches!(
1179 wfc,
1180 WorkflowCommand {
1181 variant: Some(workflow_command::Variant::CancelWorkflowExecution(_)),
1182 ..
1183 }
1184 )
1185 });
1186 }
1187 false
1188 }
1189
1190 pub fn has_continue_as_new(&self) -> bool {
1192 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1193 return s.commands.iter().any(|wfc| {
1194 matches!(
1195 wfc,
1196 WorkflowCommand {
1197 variant: Some(
1198 workflow_command::Variant::ContinueAsNewWorkflowExecution(_)
1199 ),
1200 ..
1201 }
1202 )
1203 });
1204 }
1205 false
1206 }
1207
1208 pub fn has_complete_workflow_execution(&self) -> bool {
1210 self.complete_workflow_execution_value().is_some()
1211 }
1212
1213 pub fn complete_workflow_execution_value(&self) -> Option<&Payload> {
1215 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1216 s.commands.iter().find_map(|wfc| match wfc {
1217 WorkflowCommand {
1218 variant: Some(workflow_command::Variant::CompleteWorkflowExecution(v)),
1219 ..
1220 } => v.result.as_ref(),
1221 _ => None,
1222 })
1223 } else {
1224 None
1225 }
1226 }
1227
1228 pub fn is_empty(&self) -> bool {
1230 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1231 return s.commands.is_empty();
1232 }
1233 false
1234 }
1235
1236 pub fn add_internal_flags(&mut self, patch: u32) {
1237 if let Some(workflow_activation_completion::Status::Successful(s)) = &mut self.status {
1238 s.used_internal_flags.push(patch);
1239 }
1240 }
1241 }
1242
1243 pub trait IntoCompletion {
1245 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion;
1247 }
1248
1249 impl IntoCompletion for workflow_command::Variant {
1250 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1251 WorkflowActivationCompletion::from_cmd(run_id, self)
1252 }
1253 }
1254
1255 impl<I, V> IntoCompletion for I
1256 where
1257 I: IntoIterator<Item = V>,
1258 V: Into<WorkflowCommand>,
1259 {
1260 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1261 let success = self.into_iter().map(Into::into).collect::<Vec<_>>().into();
1262 WorkflowActivationCompletion {
1263 run_id,
1264 status: Some(workflow_activation_completion::Status::Successful(success)),
1265 }
1266 }
1267 }
1268
1269 impl Display for WorkflowActivationCompletion {
1270 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1271 write!(
1272 f,
1273 "WorkflowActivationCompletion(run_id: {}, status: ",
1274 &self.run_id
1275 )?;
1276 match &self.status {
1277 None => write!(f, "empty")?,
1278 Some(s) => write!(f, "{s}")?,
1279 };
1280 write!(f, ")")
1281 }
1282 }
1283
1284 impl Display for workflow_activation_completion::Status {
1285 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1286 match self {
1287 workflow_activation_completion::Status::Successful(
1288 workflow_completion::Success { commands, .. },
1289 ) => {
1290 write!(f, "Success(")?;
1291 let mut written = 0;
1292 for c in commands {
1293 write!(f, "{c} ")?;
1294 written += 1;
1295 if written >= 10 && written < commands.len() {
1296 write!(f, "... {} more", commands.len() - written)?;
1297 break;
1298 }
1299 }
1300 write!(f, ")")
1301 }
1302 workflow_activation_completion::Status::Failed(_) => {
1303 write!(f, "Failed")
1304 }
1305 }
1306 }
1307 }
1308
1309 impl ActivityTask {
1310 pub fn start_from_poll_resp(r: PollActivityTaskQueueResponse) -> Self {
1311 let (workflow_id, run_id) = r
1312 .workflow_execution
1313 .map(|we| (we.workflow_id, we.run_id))
1314 .unwrap_or_default();
1315 Self {
1316 task_token: r.task_token,
1317 variant: Some(activity_task::activity_task::Variant::Start(
1318 activity_task::Start {
1319 workflow_namespace: r.workflow_namespace,
1320 workflow_type: r.workflow_type.map_or_else(|| "".to_string(), |wt| wt.name),
1321 workflow_execution: Some(WorkflowExecution {
1322 workflow_id,
1323 run_id,
1324 }),
1325 activity_id: r.activity_id,
1326 activity_type: r.activity_type.map_or_else(|| "".to_string(), |at| at.name),
1327 header_fields: r.header.map(Into::into).unwrap_or_default(),
1328 input: Vec::from_payloads(r.input),
1329 heartbeat_details: Vec::from_payloads(r.heartbeat_details),
1330 scheduled_time: r.scheduled_time,
1331 current_attempt_scheduled_time: r.current_attempt_scheduled_time,
1332 started_time: r.started_time,
1333 attempt: r.attempt as u32,
1334 schedule_to_close_timeout: r.schedule_to_close_timeout,
1335 start_to_close_timeout: r.start_to_close_timeout,
1336 heartbeat_timeout: r.heartbeat_timeout,
1337 retry_policy: r.retry_policy.map(fix_retry_policy),
1338 priority: r.priority,
1339 is_local: false,
1340 },
1341 )),
1342 }
1343 }
1344 }
1345
1346 impl Failure {
1347 pub fn is_timeout(&self) -> Option<crate::protos::temporal::api::enums::v1::TimeoutType> {
1348 match &self.failure_info {
1349 Some(FailureInfo::TimeoutFailureInfo(ti)) => Some(ti.timeout_type()),
1350 _ => {
1351 if let Some(c) = &self.cause {
1352 c.is_timeout()
1353 } else {
1354 None
1355 }
1356 }
1357 }
1358 }
1359
1360 pub fn application_failure(message: String, non_retryable: bool) -> Self {
1361 Self {
1362 message,
1363 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1364 ApplicationFailureInfo {
1365 non_retryable,
1366 ..Default::default()
1367 },
1368 )),
1369 ..Default::default()
1370 }
1371 }
1372
1373 pub fn application_failure_from_error(ae: anyhow::Error, non_retryable: bool) -> Self {
1374 Self {
1375 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1376 ApplicationFailureInfo {
1377 non_retryable,
1378 ..Default::default()
1379 },
1380 )),
1381 ..ae.chain()
1382 .rfold(None, |cause, e| {
1383 Some(Self {
1384 message: e.to_string(),
1385 cause: cause.map(Box::new),
1386 ..Default::default()
1387 })
1388 })
1389 .unwrap_or_default()
1390 }
1391 }
1392
1393 pub fn timeout(timeout_type: TimeoutType) -> Self {
1394 Self {
1395 message: "Activity timed out".to_string(),
1396 cause: Some(Box::new(Failure {
1397 message: "Activity timed out".to_string(),
1398 failure_info: Some(FailureInfo::TimeoutFailureInfo(TimeoutFailureInfo {
1399 timeout_type: timeout_type.into(),
1400 ..Default::default()
1401 })),
1402 ..Default::default()
1403 })),
1404 failure_info: Some(FailureInfo::ActivityFailureInfo(
1405 ActivityFailureInfo::default(),
1406 )),
1407 ..Default::default()
1408 }
1409 }
1410
1411 pub fn maybe_application_failure(&self) -> Option<&ApplicationFailureInfo> {
1413 if let Failure {
1414 failure_info: Some(FailureInfo::ApplicationFailureInfo(f)),
1415 ..
1416 } = self
1417 {
1418 Some(f)
1419 } else {
1420 None
1421 }
1422 }
1423
1424 pub fn is_benign_application_failure(&self) -> bool {
1426 self.maybe_application_failure()
1427 .is_some_and(|app_info| app_info.category() == ApplicationErrorCategory::Benign)
1428 }
1429 }
1430
1431 impl Display for Failure {
1432 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1433 write!(f, "Failure({}, ", self.message)?;
1434 match self.failure_info.as_ref() {
1435 None => write!(f, "missing info")?,
1436 Some(FailureInfo::TimeoutFailureInfo(v)) => {
1437 write!(f, "Timeout: {:?}", v.timeout_type())?;
1438 }
1439 Some(FailureInfo::ApplicationFailureInfo(v)) => {
1440 write!(f, "Application Failure: {}", v.r#type)?;
1441 }
1442 Some(FailureInfo::CanceledFailureInfo(_)) => {
1443 write!(f, "Cancelled")?;
1444 }
1445 Some(FailureInfo::TerminatedFailureInfo(_)) => {
1446 write!(f, "Terminated")?;
1447 }
1448 Some(FailureInfo::ServerFailureInfo(_)) => {
1449 write!(f, "Server Failure")?;
1450 }
1451 Some(FailureInfo::ResetWorkflowFailureInfo(_)) => {
1452 write!(f, "Reset Workflow")?;
1453 }
1454 Some(FailureInfo::ActivityFailureInfo(v)) => {
1455 write!(
1456 f,
1457 "Activity Failure: scheduled_event_id: {}",
1458 v.scheduled_event_id
1459 )?;
1460 }
1461 Some(FailureInfo::ChildWorkflowExecutionFailureInfo(v)) => {
1462 write!(
1463 f,
1464 "Child Workflow: started_event_id: {}",
1465 v.started_event_id
1466 )?;
1467 }
1468 Some(FailureInfo::NexusOperationExecutionFailureInfo(v)) => {
1469 write!(
1470 f,
1471 "Nexus Operation Failure: scheduled_event_id: {}",
1472 v.scheduled_event_id
1473 )?;
1474 }
1475 Some(FailureInfo::NexusHandlerFailureInfo(v)) => {
1476 write!(f, "Nexus Handler Failure: {}", v.r#type)?;
1477 }
1478 }
1479 write!(f, ")")
1480 }
1481 }
1482
1483 impl From<&str> for Failure {
1484 fn from(v: &str) -> Self {
1485 Failure::application_failure(v.to_string(), false)
1486 }
1487 }
1488
1489 impl From<String> for Failure {
1490 fn from(v: String) -> Self {
1491 Failure::application_failure(v, false)
1492 }
1493 }
1494
1495 impl From<anyhow::Error> for Failure {
1496 fn from(ae: anyhow::Error) -> Self {
1497 Failure::application_failure_from_error(ae, false)
1498 }
1499 }
1500
1501 pub trait FromPayloadsExt {
1502 fn from_payloads(p: Option<Payloads>) -> Self;
1503 }
1504 impl<T> FromPayloadsExt for T
1505 where
1506 T: FromIterator<Payload>,
1507 {
1508 fn from_payloads(p: Option<Payloads>) -> Self {
1509 match p {
1510 None => std::iter::empty().collect(),
1511 Some(p) => p.payloads.into_iter().collect(),
1512 }
1513 }
1514 }
1515
1516 pub trait IntoPayloadsExt {
1517 fn into_payloads(self) -> Option<Payloads>;
1518 }
1519 impl<T> IntoPayloadsExt for T
1520 where
1521 T: IntoIterator<Item = Payload>,
1522 {
1523 fn into_payloads(self) -> Option<Payloads> {
1524 let mut iterd = self.into_iter().peekable();
1525 if iterd.peek().is_none() {
1526 None
1527 } else {
1528 Some(Payloads {
1529 payloads: iterd.collect(),
1530 })
1531 }
1532 }
1533 }
1534
1535 impl From<Payload> for Payloads {
1536 fn from(p: Payload) -> Self {
1537 Self { payloads: vec![p] }
1538 }
1539 }
1540
1541 impl<T> From<T> for Payloads
1542 where
1543 T: AsRef<[u8]>,
1544 {
1545 fn from(v: T) -> Self {
1546 Self {
1547 payloads: vec![v.into()],
1548 }
1549 }
1550 }
1551
1552 #[derive(thiserror::Error, Debug)]
1553 pub enum PayloadDeserializeErr {
1554 #[error("This deserializer does not understand this payload")]
1557 DeserializerDoesNotHandle,
1558 #[error("Error during deserialization: {0}")]
1559 DeserializeErr(#[from] anyhow::Error),
1560 }
1561
1562 pub trait AsJsonPayloadExt {
1565 fn as_json_payload(&self) -> anyhow::Result<Payload>;
1566 }
1567 impl<T> AsJsonPayloadExt for T
1568 where
1569 T: Serialize,
1570 {
1571 fn as_json_payload(&self) -> anyhow::Result<Payload> {
1572 let as_json = serde_json::to_string(self)?;
1573 let mut metadata = HashMap::new();
1574 metadata.insert(
1575 ENCODING_PAYLOAD_KEY.to_string(),
1576 JSON_ENCODING_VAL.as_bytes().to_vec(),
1577 );
1578 Ok(Payload {
1579 metadata,
1580 data: as_json.into_bytes(),
1581 external_payloads: Default::default(),
1582 })
1583 }
1584 }
1585
1586 pub trait FromJsonPayloadExt: Sized {
1587 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr>;
1588 }
1589 impl<T> FromJsonPayloadExt for T
1590 where
1591 T: for<'de> Deserialize<'de>,
1592 {
1593 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr> {
1594 if !payload.is_json_payload() {
1595 return Err(PayloadDeserializeErr::DeserializerDoesNotHandle);
1596 }
1597 let payload_str = std::str::from_utf8(&payload.data).map_err(anyhow::Error::from)?;
1598 Ok(serde_json::from_str(payload_str).map_err(anyhow::Error::from)?)
1599 }
1600 }
1601
1602 #[derive(derive_more::Display, Debug)]
1604 pub enum PayloadsToPayloadError {
1605 MoreThanOnePayload,
1606 NoPayload,
1607 }
1608 impl TryFrom<Payloads> for Payload {
1609 type Error = PayloadsToPayloadError;
1610
1611 fn try_from(mut v: Payloads) -> Result<Self, Self::Error> {
1612 match v.payloads.pop() {
1613 None => Err(PayloadsToPayloadError::NoPayload),
1614 Some(p) => {
1615 if v.payloads.is_empty() {
1616 Ok(p)
1617 } else {
1618 Err(PayloadsToPayloadError::MoreThanOnePayload)
1619 }
1620 }
1621 }
1622 }
1623 }
1624
1625 fn fix_retry_policy(mut retry_policy: RetryPolicy) -> RetryPolicy {
1628 if retry_policy.initial_interval.is_none() {
1629 retry_policy.initial_interval = Default::default();
1630 }
1631 retry_policy
1632 }
1633}
1634
1635#[allow(
1637 clippy::all,
1638 missing_docs,
1639 rustdoc::broken_intra_doc_links,
1640 rustdoc::bare_urls
1641)]
1642pub mod temporal {
1644 pub mod api {
1645 pub mod activity {
1646 pub mod v1 {
1647 tonic::include_proto!("temporal.api.activity.v1");
1648 }
1649 }
1650 pub mod batch {
1651 pub mod v1 {
1652 tonic::include_proto!("temporal.api.batch.v1");
1653 }
1654 }
1655 pub mod command {
1656 pub mod v1 {
1657 tonic::include_proto!("temporal.api.command.v1");
1658
1659 use crate::protos::{
1660 coresdk::{IntoPayloadsExt, workflow_commands},
1661 temporal::api::{
1662 common::v1::{ActivityType, WorkflowType},
1663 enums::v1::CommandType,
1664 },
1665 };
1666 use command::Attributes;
1667 use std::fmt::{Display, Formatter};
1668
1669 impl From<command::Attributes> for Command {
1670 fn from(c: command::Attributes) -> Self {
1671 match c {
1672 a @ Attributes::StartTimerCommandAttributes(_) => Self {
1673 command_type: CommandType::StartTimer as i32,
1674 attributes: Some(a),
1675 user_metadata: Default::default(),
1676 },
1677 a @ Attributes::CancelTimerCommandAttributes(_) => Self {
1678 command_type: CommandType::CancelTimer as i32,
1679 attributes: Some(a),
1680 user_metadata: Default::default(),
1681 },
1682 a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => Self {
1683 command_type: CommandType::CompleteWorkflowExecution as i32,
1684 attributes: Some(a),
1685 user_metadata: Default::default(),
1686 },
1687 a @ Attributes::FailWorkflowExecutionCommandAttributes(_) => Self {
1688 command_type: CommandType::FailWorkflowExecution as i32,
1689 attributes: Some(a),
1690 user_metadata: Default::default(),
1691 },
1692 a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self {
1693 command_type: CommandType::ScheduleActivityTask as i32,
1694 attributes: Some(a),
1695 user_metadata: Default::default(),
1696 },
1697 a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => Self {
1698 command_type: CommandType::RequestCancelActivityTask as i32,
1699 attributes: Some(a),
1700 user_metadata: Default::default(),
1701 },
1702 a @ Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1703 Self {
1704 command_type: CommandType::ContinueAsNewWorkflowExecution
1705 as i32,
1706 attributes: Some(a),
1707 user_metadata: Default::default(),
1708 }
1709 }
1710 a @ Attributes::CancelWorkflowExecutionCommandAttributes(_) => Self {
1711 command_type: CommandType::CancelWorkflowExecution as i32,
1712 attributes: Some(a),
1713 user_metadata: Default::default(),
1714 },
1715 a @ Attributes::RecordMarkerCommandAttributes(_) => Self {
1716 command_type: CommandType::RecordMarker as i32,
1717 attributes: Some(a),
1718 user_metadata: Default::default(),
1719 },
1720 a @ Attributes::ProtocolMessageCommandAttributes(_) => Self {
1721 command_type: CommandType::ProtocolMessage as i32,
1722 attributes: Some(a),
1723 user_metadata: Default::default(),
1724 },
1725 a @ Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1726 Self {
1727 command_type: CommandType::RequestCancelNexusOperation as i32,
1728 attributes: Some(a),
1729 user_metadata: Default::default(),
1730 }
1731 }
1732 _ => unimplemented!(),
1733 }
1734 }
1735 }
1736
1737 impl Display for Command {
1738 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1739 let ct = CommandType::try_from(self.command_type)
1740 .unwrap_or(CommandType::Unspecified);
1741 write!(f, "{:?}", ct)
1742 }
1743 }
1744
1745 pub trait CommandAttributesExt {
1746 fn as_type(&self) -> CommandType;
1747 }
1748
1749 impl CommandAttributesExt for command::Attributes {
1750 fn as_type(&self) -> CommandType {
1751 match self {
1752 Attributes::ScheduleActivityTaskCommandAttributes(_) => {
1753 CommandType::ScheduleActivityTask
1754 }
1755 Attributes::StartTimerCommandAttributes(_) => CommandType::StartTimer,
1756 Attributes::CompleteWorkflowExecutionCommandAttributes(_) => {
1757 CommandType::CompleteWorkflowExecution
1758 }
1759 Attributes::FailWorkflowExecutionCommandAttributes(_) => {
1760 CommandType::FailWorkflowExecution
1761 }
1762 Attributes::RequestCancelActivityTaskCommandAttributes(_) => {
1763 CommandType::RequestCancelActivityTask
1764 }
1765 Attributes::CancelTimerCommandAttributes(_) => CommandType::CancelTimer,
1766 Attributes::CancelWorkflowExecutionCommandAttributes(_) => {
1767 CommandType::CancelWorkflowExecution
1768 }
1769 Attributes::RequestCancelExternalWorkflowExecutionCommandAttributes(
1770 _,
1771 ) => CommandType::RequestCancelExternalWorkflowExecution,
1772 Attributes::RecordMarkerCommandAttributes(_) => {
1773 CommandType::RecordMarker
1774 }
1775 Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1776 CommandType::ContinueAsNewWorkflowExecution
1777 }
1778 Attributes::StartChildWorkflowExecutionCommandAttributes(_) => {
1779 CommandType::StartChildWorkflowExecution
1780 }
1781 Attributes::SignalExternalWorkflowExecutionCommandAttributes(_) => {
1782 CommandType::SignalExternalWorkflowExecution
1783 }
1784 Attributes::UpsertWorkflowSearchAttributesCommandAttributes(_) => {
1785 CommandType::UpsertWorkflowSearchAttributes
1786 }
1787 Attributes::ProtocolMessageCommandAttributes(_) => {
1788 CommandType::ProtocolMessage
1789 }
1790 Attributes::ModifyWorkflowPropertiesCommandAttributes(_) => {
1791 CommandType::ModifyWorkflowProperties
1792 }
1793 Attributes::ScheduleNexusOperationCommandAttributes(_) => {
1794 CommandType::ScheduleNexusOperation
1795 }
1796 Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1797 CommandType::RequestCancelNexusOperation
1798 }
1799 }
1800 }
1801 }
1802
1803 impl Display for command::Attributes {
1804 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1805 write!(f, "{:?}", self.as_type())
1806 }
1807 }
1808
1809 impl From<workflow_commands::StartTimer> for command::Attributes {
1810 fn from(s: workflow_commands::StartTimer) -> Self {
1811 Self::StartTimerCommandAttributes(StartTimerCommandAttributes {
1812 timer_id: s.seq.to_string(),
1813 start_to_fire_timeout: s.start_to_fire_timeout,
1814 })
1815 }
1816 }
1817
1818 impl From<workflow_commands::UpsertWorkflowSearchAttributes> for command::Attributes {
1819 fn from(s: workflow_commands::UpsertWorkflowSearchAttributes) -> Self {
1820 Self::UpsertWorkflowSearchAttributesCommandAttributes(
1821 UpsertWorkflowSearchAttributesCommandAttributes {
1822 search_attributes: s.search_attributes,
1823 },
1824 )
1825 }
1826 }
1827
1828 impl From<workflow_commands::ModifyWorkflowProperties> for command::Attributes {
1829 fn from(s: workflow_commands::ModifyWorkflowProperties) -> Self {
1830 Self::ModifyWorkflowPropertiesCommandAttributes(
1831 ModifyWorkflowPropertiesCommandAttributes {
1832 upserted_memo: s.upserted_memo.map(Into::into),
1833 },
1834 )
1835 }
1836 }
1837
1838 impl From<workflow_commands::CancelTimer> for command::Attributes {
1839 fn from(s: workflow_commands::CancelTimer) -> Self {
1840 Self::CancelTimerCommandAttributes(CancelTimerCommandAttributes {
1841 timer_id: s.seq.to_string(),
1842 })
1843 }
1844 }
1845
1846 pub fn schedule_activity_cmd_to_api(
1847 s: workflow_commands::ScheduleActivity,
1848 use_workflow_build_id: bool,
1849 ) -> command::Attributes {
1850 command::Attributes::ScheduleActivityTaskCommandAttributes(
1851 ScheduleActivityTaskCommandAttributes {
1852 activity_id: s.activity_id,
1853 activity_type: Some(ActivityType {
1854 name: s.activity_type,
1855 }),
1856 task_queue: Some(s.task_queue.into()),
1857 header: Some(s.headers.into()),
1858 input: s.arguments.into_payloads(),
1859 schedule_to_close_timeout: s.schedule_to_close_timeout,
1860 schedule_to_start_timeout: s.schedule_to_start_timeout,
1861 start_to_close_timeout: s.start_to_close_timeout,
1862 heartbeat_timeout: s.heartbeat_timeout,
1863 retry_policy: s.retry_policy.map(Into::into),
1864 request_eager_execution: !s.do_not_eagerly_execute,
1865 use_workflow_build_id,
1866 priority: s.priority,
1867 },
1868 )
1869 }
1870
1871 #[allow(deprecated)]
1872 pub fn start_child_workflow_cmd_to_api(
1873 s: workflow_commands::StartChildWorkflowExecution,
1874 inherit_build_id: bool,
1875 ) -> command::Attributes {
1876 command::Attributes::StartChildWorkflowExecutionCommandAttributes(
1877 StartChildWorkflowExecutionCommandAttributes {
1878 workflow_id: s.workflow_id,
1879 workflow_type: Some(WorkflowType {
1880 name: s.workflow_type,
1881 }),
1882 control: "".into(),
1883 namespace: s.namespace,
1884 task_queue: Some(s.task_queue.into()),
1885 header: Some(s.headers.into()),
1886 memo: Some(s.memo.into()),
1887 search_attributes: s.search_attributes,
1888 input: s.input.into_payloads(),
1889 workflow_id_reuse_policy: s.workflow_id_reuse_policy,
1890 workflow_execution_timeout: s.workflow_execution_timeout,
1891 workflow_run_timeout: s.workflow_run_timeout,
1892 workflow_task_timeout: s.workflow_task_timeout,
1893 retry_policy: s.retry_policy.map(Into::into),
1894 cron_schedule: s.cron_schedule.clone(),
1895 parent_close_policy: s.parent_close_policy,
1896 inherit_build_id,
1897 priority: s.priority,
1898 },
1899 )
1900 }
1901
1902 impl From<workflow_commands::CompleteWorkflowExecution> for command::Attributes {
1903 fn from(c: workflow_commands::CompleteWorkflowExecution) -> Self {
1904 Self::CompleteWorkflowExecutionCommandAttributes(
1905 CompleteWorkflowExecutionCommandAttributes {
1906 result: c.result.map(Into::into),
1907 },
1908 )
1909 }
1910 }
1911
1912 impl From<workflow_commands::FailWorkflowExecution> for command::Attributes {
1913 fn from(c: workflow_commands::FailWorkflowExecution) -> Self {
1914 Self::FailWorkflowExecutionCommandAttributes(
1915 FailWorkflowExecutionCommandAttributes {
1916 failure: c.failure.map(Into::into),
1917 },
1918 )
1919 }
1920 }
1921
1922 #[allow(deprecated)]
1923 pub fn continue_as_new_cmd_to_api(
1924 c: workflow_commands::ContinueAsNewWorkflowExecution,
1925 inherit_build_id: bool,
1926 ) -> command::Attributes {
1927 command::Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(
1928 ContinueAsNewWorkflowExecutionCommandAttributes {
1929 workflow_type: Some(c.workflow_type.into()),
1930 task_queue: Some(c.task_queue.into()),
1931 input: c.arguments.into_payloads(),
1932 workflow_run_timeout: c.workflow_run_timeout,
1933 workflow_task_timeout: c.workflow_task_timeout,
1934 memo: if c.memo.is_empty() {
1935 None
1936 } else {
1937 Some(c.memo.into())
1938 },
1939 header: if c.headers.is_empty() {
1940 None
1941 } else {
1942 Some(c.headers.into())
1943 },
1944 retry_policy: c.retry_policy,
1945 search_attributes: c.search_attributes,
1946 inherit_build_id,
1947 initial_versioning_behavior: c.initial_versioning_behavior,
1948 ..Default::default()
1949 },
1950 )
1951 }
1952
1953 impl From<workflow_commands::CancelWorkflowExecution> for command::Attributes {
1954 fn from(_c: workflow_commands::CancelWorkflowExecution) -> Self {
1955 Self::CancelWorkflowExecutionCommandAttributes(
1956 CancelWorkflowExecutionCommandAttributes { details: None },
1957 )
1958 }
1959 }
1960
1961 impl From<workflow_commands::ScheduleNexusOperation> for command::Attributes {
1962 fn from(c: workflow_commands::ScheduleNexusOperation) -> Self {
1963 Self::ScheduleNexusOperationCommandAttributes(
1964 ScheduleNexusOperationCommandAttributes {
1965 endpoint: c.endpoint,
1966 service: c.service,
1967 operation: c.operation,
1968 input: c.input,
1969 schedule_to_close_timeout: c.schedule_to_close_timeout,
1970 schedule_to_start_timeout: c.schedule_to_start_timeout,
1971 start_to_close_timeout: c.start_to_close_timeout,
1972 nexus_header: c.nexus_header,
1973 },
1974 )
1975 }
1976 }
1977 }
1978 }
1979 #[allow(rustdoc::invalid_html_tags)]
1980 pub mod cloud {
1981 pub mod account {
1982 pub mod v1 {
1983 tonic::include_proto!("temporal.api.cloud.account.v1");
1984 }
1985 }
1986 pub mod cloudservice {
1987 pub mod v1 {
1988 tonic::include_proto!("temporal.api.cloud.cloudservice.v1");
1989 }
1990 }
1991 pub mod connectivityrule {
1992 pub mod v1 {
1993 tonic::include_proto!("temporal.api.cloud.connectivityrule.v1");
1994 }
1995 }
1996 pub mod identity {
1997 pub mod v1 {
1998 tonic::include_proto!("temporal.api.cloud.identity.v1");
1999 }
2000 }
2001 pub mod namespace {
2002 pub mod v1 {
2003 tonic::include_proto!("temporal.api.cloud.namespace.v1");
2004 }
2005 }
2006 pub mod nexus {
2007 pub mod v1 {
2008 tonic::include_proto!("temporal.api.cloud.nexus.v1");
2009 }
2010 }
2011 pub mod operation {
2012 pub mod v1 {
2013 tonic::include_proto!("temporal.api.cloud.operation.v1");
2014 }
2015 }
2016 pub mod region {
2017 pub mod v1 {
2018 tonic::include_proto!("temporal.api.cloud.region.v1");
2019 }
2020 }
2021 pub mod resource {
2022 pub mod v1 {
2023 tonic::include_proto!("temporal.api.cloud.resource.v1");
2024 }
2025 }
2026 pub mod sink {
2027 pub mod v1 {
2028 tonic::include_proto!("temporal.api.cloud.sink.v1");
2029 }
2030 }
2031 pub mod usage {
2032 pub mod v1 {
2033 tonic::include_proto!("temporal.api.cloud.usage.v1");
2034 }
2035 }
2036 }
2037 pub mod common {
2038 pub mod v1 {
2039 use crate::protos::{ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL};
2040 use base64::{Engine, prelude::BASE64_STANDARD};
2041 use std::{
2042 collections::HashMap,
2043 fmt::{Display, Formatter},
2044 };
2045 include_proto_with_serde!("temporal.api.common.v1");
2046
2047 impl<T> From<T> for Payload
2048 where
2049 T: AsRef<[u8]>,
2050 {
2051 fn from(v: T) -> Self {
2052 let mut metadata = HashMap::new();
2055 metadata.insert(ENCODING_PAYLOAD_KEY.to_string(), b"binary/plain".to_vec());
2056 Self {
2057 metadata,
2058 data: v.as_ref().to_vec(),
2059 external_payloads: Default::default(),
2060 }
2061 }
2062 }
2063
2064 impl Payload {
2065 pub fn as_slice(&self) -> &[u8] {
2067 self.data.as_slice()
2068 }
2069
2070 pub fn is_json_payload(&self) -> bool {
2071 self.metadata
2072 .get(ENCODING_PAYLOAD_KEY)
2073 .map(|v| v.as_slice() == JSON_ENCODING_VAL.as_bytes())
2074 .unwrap_or_default()
2075 }
2076 }
2077
2078 impl std::fmt::Debug for Payload {
2079 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2080 if std::env::var("TEMPORAL_PRINT_FULL_PAYLOADS").is_err()
2081 && self.data.len() > 64
2082 {
2083 let mut windows = self.data.as_slice().windows(32);
2084 write!(
2085 f,
2086 "[{}..{}]",
2087 BASE64_STANDARD.encode(windows.next().unwrap_or_default()),
2088 BASE64_STANDARD.encode(windows.next_back().unwrap_or_default())
2089 )
2090 } else {
2091 write!(f, "[{}]", BASE64_STANDARD.encode(&self.data))
2092 }
2093 }
2094 }
2095
2096 impl Display for Payload {
2097 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2098 write!(f, "{:?}", self)
2099 }
2100 }
2101
2102 impl Display for Header {
2103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2104 write!(f, "Header(")?;
2105 for kv in &self.fields {
2106 write!(f, "{}: ", kv.0)?;
2107 write!(f, "{}, ", kv.1)?;
2108 }
2109 write!(f, ")")
2110 }
2111 }
2112
2113 impl From<Header> for HashMap<String, Payload> {
2114 fn from(h: Header) -> Self {
2115 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2116 }
2117 }
2118
2119 impl From<Memo> for HashMap<String, Payload> {
2120 fn from(h: Memo) -> Self {
2121 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2122 }
2123 }
2124
2125 impl From<SearchAttributes> for HashMap<String, Payload> {
2126 fn from(h: SearchAttributes) -> Self {
2127 h.indexed_fields
2128 .into_iter()
2129 .map(|(k, v)| (k, v.into()))
2130 .collect()
2131 }
2132 }
2133
2134 impl From<HashMap<String, Payload>> for SearchAttributes {
2135 fn from(h: HashMap<String, Payload>) -> Self {
2136 Self {
2137 indexed_fields: h.into_iter().map(|(k, v)| (k, v.into())).collect(),
2138 }
2139 }
2140 }
2141
2142 impl From<String> for ActivityType {
2143 fn from(name: String) -> Self {
2144 Self { name }
2145 }
2146 }
2147
2148 impl From<&str> for ActivityType {
2149 fn from(name: &str) -> Self {
2150 Self {
2151 name: name.to_string(),
2152 }
2153 }
2154 }
2155
2156 impl From<ActivityType> for String {
2157 fn from(at: ActivityType) -> Self {
2158 at.name
2159 }
2160 }
2161
2162 impl From<&str> for WorkflowType {
2163 fn from(v: &str) -> Self {
2164 Self {
2165 name: v.to_string(),
2166 }
2167 }
2168 }
2169 }
2170 }
2171 pub mod deployment {
2172 pub mod v1 {
2173 tonic::include_proto!("temporal.api.deployment.v1");
2174 }
2175 }
2176 pub mod enums {
2177 pub mod v1 {
2178 include_proto_with_serde!("temporal.api.enums.v1");
2179 }
2180 }
2181 pub mod errordetails {
2182 pub mod v1 {
2183 tonic::include_proto!("temporal.api.errordetails.v1");
2184 }
2185 }
2186 pub mod failure {
2187 pub mod v1 {
2188 include_proto_with_serde!("temporal.api.failure.v1");
2189 }
2190 }
2191 pub mod filter {
2192 pub mod v1 {
2193 tonic::include_proto!("temporal.api.filter.v1");
2194 }
2195 }
2196 pub mod history {
2197 pub mod v1 {
2198 use crate::protos::temporal::api::{
2199 enums::v1::EventType, history::v1::history_event::Attributes,
2200 };
2201 use anyhow::bail;
2202 use std::fmt::{Display, Formatter};
2203
2204 tonic::include_proto!("temporal.api.history.v1");
2205
2206 impl History {
2207 pub fn extract_run_id_from_start(&self) -> Result<&str, anyhow::Error> {
2208 extract_original_run_id_from_events(&self.events)
2209 }
2210
2211 pub fn last_event_id(&self) -> i64 {
2214 self.events.last().map(|e| e.event_id).unwrap_or_default()
2215 }
2216 }
2217
2218 pub fn extract_original_run_id_from_events(
2219 events: &[HistoryEvent],
2220 ) -> Result<&str, anyhow::Error> {
2221 if let Some(Attributes::WorkflowExecutionStartedEventAttributes(wes)) =
2222 events.get(0).and_then(|x| x.attributes.as_ref())
2223 {
2224 Ok(&wes.original_execution_run_id)
2225 } else {
2226 bail!("First event is not WorkflowExecutionStarted?!?")
2227 }
2228 }
2229
2230 impl HistoryEvent {
2231 pub fn is_command_event(&self) -> bool {
2233 EventType::try_from(self.event_type).map_or(false, |et| match et {
2234 EventType::ActivityTaskScheduled
2235 | EventType::ActivityTaskCancelRequested
2236 | EventType::MarkerRecorded
2237 | EventType::RequestCancelExternalWorkflowExecutionInitiated
2238 | EventType::SignalExternalWorkflowExecutionInitiated
2239 | EventType::StartChildWorkflowExecutionInitiated
2240 | EventType::TimerCanceled
2241 | EventType::TimerStarted
2242 | EventType::UpsertWorkflowSearchAttributes
2243 | EventType::WorkflowPropertiesModified
2244 | EventType::NexusOperationScheduled
2245 | EventType::NexusOperationCancelRequested
2246 | EventType::WorkflowExecutionCanceled
2247 | EventType::WorkflowExecutionCompleted
2248 | EventType::WorkflowExecutionContinuedAsNew
2249 | EventType::WorkflowExecutionFailed
2250 | EventType::WorkflowExecutionUpdateAccepted
2251 | EventType::WorkflowExecutionUpdateRejected
2252 | EventType::WorkflowExecutionUpdateCompleted => true,
2253 _ => false,
2254 })
2255 }
2256
2257 pub fn get_initial_command_event_id(&self) -> Option<i64> {
2261 self.attributes.as_ref().and_then(|a| {
2262 match a {
2265 Attributes::ActivityTaskStartedEventAttributes(a) =>
2266 Some(a.scheduled_event_id),
2267 Attributes::ActivityTaskCompletedEventAttributes(a) =>
2268 Some(a.scheduled_event_id),
2269 Attributes::ActivityTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2270 Attributes::ActivityTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2271 Attributes::ActivityTaskCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2272 Attributes::ActivityTaskCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2273 Attributes::TimerFiredEventAttributes(a) => Some(a.started_event_id),
2274 Attributes::TimerCanceledEventAttributes(a) => Some(a.started_event_id),
2275 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2276 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(a) => Some(a.initiated_event_id),
2277 Attributes::StartChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2278 Attributes::ChildWorkflowExecutionStartedEventAttributes(a) => Some(a.initiated_event_id),
2279 Attributes::ChildWorkflowExecutionCompletedEventAttributes(a) => Some(a.initiated_event_id),
2280 Attributes::ChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2281 Attributes::ChildWorkflowExecutionCanceledEventAttributes(a) => Some(a.initiated_event_id),
2282 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(a) => Some(a.initiated_event_id),
2283 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(a) => Some(a.initiated_event_id),
2284 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2285 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(a) => Some(a.initiated_event_id),
2286 Attributes::WorkflowTaskStartedEventAttributes(a) => Some(a.scheduled_event_id),
2287 Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2288 Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2289 Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2290 Attributes::NexusOperationStartedEventAttributes(a) => Some(a.scheduled_event_id),
2291 Attributes::NexusOperationCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2292 Attributes::NexusOperationFailedEventAttributes(a) => Some(a.scheduled_event_id),
2293 Attributes::NexusOperationTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2294 Attributes::NexusOperationCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2295 Attributes::NexusOperationCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2296 Attributes::NexusOperationCancelRequestCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2297 Attributes::NexusOperationCancelRequestFailedEventAttributes(a) => Some(a.scheduled_event_id),
2298 _ => None
2299 }
2300 })
2301 }
2302
2303 pub fn get_protocol_instance_id(&self) -> Option<&str> {
2305 self.attributes.as_ref().and_then(|a| match a {
2306 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(a) => {
2307 Some(a.protocol_instance_id.as_str())
2308 }
2309 _ => None,
2310 })
2311 }
2312
2313 pub fn is_final_wf_execution_event(&self) -> bool {
2315 match self.event_type() {
2316 EventType::WorkflowExecutionCompleted => true,
2317 EventType::WorkflowExecutionCanceled => true,
2318 EventType::WorkflowExecutionFailed => true,
2319 EventType::WorkflowExecutionTimedOut => true,
2320 EventType::WorkflowExecutionContinuedAsNew => true,
2321 EventType::WorkflowExecutionTerminated => true,
2322 _ => false,
2323 }
2324 }
2325
2326 pub fn is_wft_closed_event(&self) -> bool {
2327 match self.event_type() {
2328 EventType::WorkflowTaskCompleted => true,
2329 EventType::WorkflowTaskFailed => true,
2330 EventType::WorkflowTaskTimedOut => true,
2331 _ => false,
2332 }
2333 }
2334
2335 pub fn is_ignorable(&self) -> bool {
2336 if !self.worker_may_ignore {
2337 return false;
2338 }
2339 if let Some(a) = self.attributes.as_ref() {
2342 match a {
2343 Attributes::WorkflowExecutionStartedEventAttributes(_) => false,
2344 Attributes::WorkflowExecutionCompletedEventAttributes(_) => false,
2345 Attributes::WorkflowExecutionFailedEventAttributes(_) => false,
2346 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => false,
2347 Attributes::WorkflowTaskScheduledEventAttributes(_) => false,
2348 Attributes::WorkflowTaskStartedEventAttributes(_) => false,
2349 Attributes::WorkflowTaskCompletedEventAttributes(_) => false,
2350 Attributes::WorkflowTaskTimedOutEventAttributes(_) => false,
2351 Attributes::WorkflowTaskFailedEventAttributes(_) => false,
2352 Attributes::ActivityTaskScheduledEventAttributes(_) => false,
2353 Attributes::ActivityTaskStartedEventAttributes(_) => false,
2354 Attributes::ActivityTaskCompletedEventAttributes(_) => false,
2355 Attributes::ActivityTaskFailedEventAttributes(_) => false,
2356 Attributes::ActivityTaskTimedOutEventAttributes(_) => false,
2357 Attributes::TimerStartedEventAttributes(_) => false,
2358 Attributes::TimerFiredEventAttributes(_) => false,
2359 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => false,
2360 Attributes::ActivityTaskCanceledEventAttributes(_) => false,
2361 Attributes::TimerCanceledEventAttributes(_) => false,
2362 Attributes::MarkerRecordedEventAttributes(_) => false,
2363 Attributes::WorkflowExecutionSignaledEventAttributes(_) => false,
2364 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => false,
2365 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => false,
2366 Attributes::WorkflowExecutionCanceledEventAttributes(_) => false,
2367 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2368 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => false,
2369 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => false,
2370 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => false,
2371 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => false,
2372 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => false,
2373 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => false,
2374 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => false,
2375 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => false,
2376 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => false,
2377 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => false,
2378 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => false,
2379 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2380 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => false,
2381 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => false,
2382 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => false,
2383 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => false,
2384 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => false,
2385 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => false,
2386 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => false,
2387 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => false,
2388 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => false,
2389 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => false,
2390 Attributes::NexusOperationScheduledEventAttributes(_) => false,
2391 Attributes::NexusOperationStartedEventAttributes(_) => false,
2392 Attributes::NexusOperationCompletedEventAttributes(_) => false,
2393 Attributes::NexusOperationFailedEventAttributes(_) => false,
2394 Attributes::NexusOperationCanceledEventAttributes(_) => false,
2395 Attributes::NexusOperationTimedOutEventAttributes(_) => false,
2396 Attributes::NexusOperationCancelRequestedEventAttributes(_) => false,
2397 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => true,
2399 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => false,
2400 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => false,
2401 Attributes::WorkflowExecutionPausedEventAttributes(_) => true,
2403 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => true,
2405 }
2406 } else {
2407 false
2408 }
2409 }
2410 }
2411
2412 impl Display for HistoryEvent {
2413 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2414 write!(
2415 f,
2416 "HistoryEvent(id: {}, {:?})",
2417 self.event_id,
2418 EventType::try_from(self.event_type).unwrap_or_default()
2419 )
2420 }
2421 }
2422
2423 impl Attributes {
2424 pub fn event_type(&self) -> EventType {
2425 match self {
2427 Attributes::WorkflowExecutionStartedEventAttributes(_) => { EventType::WorkflowExecutionStarted }
2428 Attributes::WorkflowExecutionCompletedEventAttributes(_) => { EventType::WorkflowExecutionCompleted }
2429 Attributes::WorkflowExecutionFailedEventAttributes(_) => { EventType::WorkflowExecutionFailed }
2430 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => { EventType::WorkflowExecutionTimedOut }
2431 Attributes::WorkflowTaskScheduledEventAttributes(_) => { EventType::WorkflowTaskScheduled }
2432 Attributes::WorkflowTaskStartedEventAttributes(_) => { EventType::WorkflowTaskStarted }
2433 Attributes::WorkflowTaskCompletedEventAttributes(_) => { EventType::WorkflowTaskCompleted }
2434 Attributes::WorkflowTaskTimedOutEventAttributes(_) => { EventType::WorkflowTaskTimedOut }
2435 Attributes::WorkflowTaskFailedEventAttributes(_) => { EventType::WorkflowTaskFailed }
2436 Attributes::ActivityTaskScheduledEventAttributes(_) => { EventType::ActivityTaskScheduled }
2437 Attributes::ActivityTaskStartedEventAttributes(_) => { EventType::ActivityTaskStarted }
2438 Attributes::ActivityTaskCompletedEventAttributes(_) => { EventType::ActivityTaskCompleted }
2439 Attributes::ActivityTaskFailedEventAttributes(_) => { EventType::ActivityTaskFailed }
2440 Attributes::ActivityTaskTimedOutEventAttributes(_) => { EventType::ActivityTaskTimedOut }
2441 Attributes::TimerStartedEventAttributes(_) => { EventType::TimerStarted }
2442 Attributes::TimerFiredEventAttributes(_) => { EventType::TimerFired }
2443 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => { EventType::ActivityTaskCancelRequested }
2444 Attributes::ActivityTaskCanceledEventAttributes(_) => { EventType::ActivityTaskCanceled }
2445 Attributes::TimerCanceledEventAttributes(_) => { EventType::TimerCanceled }
2446 Attributes::MarkerRecordedEventAttributes(_) => { EventType::MarkerRecorded }
2447 Attributes::WorkflowExecutionSignaledEventAttributes(_) => { EventType::WorkflowExecutionSignaled }
2448 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => { EventType::WorkflowExecutionTerminated }
2449 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::WorkflowExecutionCancelRequested }
2450 Attributes::WorkflowExecutionCanceledEventAttributes(_) => { EventType::WorkflowExecutionCanceled }
2451 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionInitiated }
2452 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionFailed }
2453 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::ExternalWorkflowExecutionCancelRequested }
2454 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => { EventType::WorkflowExecutionContinuedAsNew }
2455 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => { EventType::StartChildWorkflowExecutionInitiated }
2456 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => { EventType::StartChildWorkflowExecutionFailed }
2457 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => { EventType::ChildWorkflowExecutionStarted }
2458 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => { EventType::ChildWorkflowExecutionCompleted }
2459 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => { EventType::ChildWorkflowExecutionFailed }
2460 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => { EventType::ChildWorkflowExecutionCanceled }
2461 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => { EventType::ChildWorkflowExecutionTimedOut }
2462 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => { EventType::ChildWorkflowExecutionTerminated }
2463 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionInitiated }
2464 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionFailed }
2465 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => { EventType::ExternalWorkflowExecutionSignaled }
2466 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => { EventType::UpsertWorkflowSearchAttributes }
2467 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAdmitted }
2468 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => { EventType::WorkflowExecutionUpdateRejected }
2469 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAccepted }
2470 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => { EventType::WorkflowExecutionUpdateCompleted }
2471 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => { EventType::WorkflowPropertiesModifiedExternally }
2472 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => { EventType::ActivityPropertiesModifiedExternally }
2473 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => { EventType::WorkflowPropertiesModified }
2474 Attributes::NexusOperationScheduledEventAttributes(_) => { EventType::NexusOperationScheduled }
2475 Attributes::NexusOperationStartedEventAttributes(_) => { EventType::NexusOperationStarted }
2476 Attributes::NexusOperationCompletedEventAttributes(_) => { EventType::NexusOperationCompleted }
2477 Attributes::NexusOperationFailedEventAttributes(_) => { EventType::NexusOperationFailed }
2478 Attributes::NexusOperationCanceledEventAttributes(_) => { EventType::NexusOperationCanceled }
2479 Attributes::NexusOperationTimedOutEventAttributes(_) => { EventType::NexusOperationTimedOut }
2480 Attributes::NexusOperationCancelRequestedEventAttributes(_) => { EventType::NexusOperationCancelRequested }
2481 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => { EventType::WorkflowExecutionOptionsUpdated }
2482 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => { EventType::NexusOperationCancelRequestCompleted }
2483 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => { EventType::NexusOperationCancelRequestFailed }
2484 Attributes::WorkflowExecutionPausedEventAttributes(_) => { EventType::WorkflowExecutionPaused }
2485 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => { EventType::WorkflowExecutionUnpaused }
2486 }
2487 }
2488 }
2489 }
2490 }
2491 pub mod namespace {
2492 pub mod v1 {
2493 tonic::include_proto!("temporal.api.namespace.v1");
2494 }
2495 }
2496 pub mod operatorservice {
2497 pub mod v1 {
2498 tonic::include_proto!("temporal.api.operatorservice.v1");
2499 }
2500 }
2501 pub mod protocol {
2502 pub mod v1 {
2503 use std::fmt::{Display, Formatter};
2504 tonic::include_proto!("temporal.api.protocol.v1");
2505
2506 impl Display for Message {
2507 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2508 write!(f, "ProtocolMessage({})", self.id)
2509 }
2510 }
2511 }
2512 }
2513 pub mod query {
2514 pub mod v1 {
2515 tonic::include_proto!("temporal.api.query.v1");
2516 }
2517 }
2518 pub mod replication {
2519 pub mod v1 {
2520 tonic::include_proto!("temporal.api.replication.v1");
2521 }
2522 }
2523 pub mod rules {
2524 pub mod v1 {
2525 tonic::include_proto!("temporal.api.rules.v1");
2526 }
2527 }
2528 pub mod schedule {
2529 #[allow(rustdoc::invalid_html_tags)]
2530 pub mod v1 {
2531 tonic::include_proto!("temporal.api.schedule.v1");
2532 }
2533 }
2534 pub mod sdk {
2535 pub mod v1 {
2536 tonic::include_proto!("temporal.api.sdk.v1");
2537 }
2538 }
2539 pub mod taskqueue {
2540 pub mod v1 {
2541 use crate::protos::temporal::api::enums::v1::TaskQueueKind;
2542 tonic::include_proto!("temporal.api.taskqueue.v1");
2543
2544 impl From<String> for TaskQueue {
2545 fn from(name: String) -> Self {
2546 Self {
2547 name,
2548 kind: TaskQueueKind::Normal as i32,
2549 normal_name: "".to_string(),
2550 }
2551 }
2552 }
2553 }
2554 }
2555 pub mod testservice {
2556 pub mod v1 {
2557 tonic::include_proto!("temporal.api.testservice.v1");
2558 }
2559 }
2560 pub mod update {
2561 pub mod v1 {
2562 use crate::protos::temporal::api::update::v1::outcome::Value;
2563 tonic::include_proto!("temporal.api.update.v1");
2564
2565 impl Outcome {
2566 pub fn is_success(&self) -> bool {
2567 match self.value {
2568 Some(Value::Success(_)) => true,
2569 _ => false,
2570 }
2571 }
2572 }
2573 }
2574 }
2575 pub mod version {
2576 pub mod v1 {
2577 tonic::include_proto!("temporal.api.version.v1");
2578 }
2579 }
2580 pub mod worker {
2581 pub mod v1 {
2582 tonic::include_proto!("temporal.api.worker.v1");
2583 }
2584 }
2585 pub mod workflow {
2586 pub mod v1 {
2587 tonic::include_proto!("temporal.api.workflow.v1");
2588 }
2589 }
2590 pub mod nexus {
2591 pub mod v1 {
2592 use crate::protos::{
2593 camel_case_to_screaming_snake,
2594 temporal::api::{
2595 common::{
2596 self,
2597 v1::link::{WorkflowEvent, workflow_event},
2598 },
2599 enums::v1::EventType,
2600 failure,
2601 },
2602 };
2603 use anyhow::{anyhow, bail};
2604 use prost::Name;
2605 use std::{
2606 collections::HashMap,
2607 fmt::{Display, Formatter},
2608 };
2609 use tonic::transport::Uri;
2610
2611 tonic::include_proto!("temporal.api.nexus.v1");
2612
2613 impl Display for Response {
2614 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2615 write!(f, "NexusResponse(",)?;
2616 match &self.variant {
2617 None => {}
2618 Some(v) => {
2619 write!(f, "{v}")?;
2620 }
2621 }
2622 write!(f, ")")
2623 }
2624 }
2625
2626 impl Display for response::Variant {
2627 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2628 match self {
2629 response::Variant::StartOperation(_) => {
2630 write!(f, "StartOperation")
2631 }
2632 response::Variant::CancelOperation(_) => {
2633 write!(f, "CancelOperation")
2634 }
2635 }
2636 }
2637 }
2638
2639 impl Display for HandlerError {
2640 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2641 write!(f, "HandlerError")
2642 }
2643 }
2644
2645 pub enum NexusTaskFailure {
2646 Legacy(HandlerError),
2647 Temporal(failure::v1::Failure),
2648 }
2649
2650 static SCHEME_PREFIX: &str = "temporal://";
2651
2652 pub fn workflow_event_link_from_nexus(
2654 l: &Link,
2655 ) -> Result<common::v1::Link, anyhow::Error> {
2656 if !l.url.starts_with(SCHEME_PREFIX) {
2657 bail!("Invalid scheme for nexus link: {:?}", l.url);
2658 }
2659 let no_authority_url = l.url.strip_prefix(SCHEME_PREFIX).unwrap();
2662 let uri = Uri::try_from(no_authority_url)?;
2663 let parts = uri.into_parts();
2664 let path = parts.path_and_query.ok_or_else(|| {
2665 anyhow!("Failed to parse nexus link, invalid path: {:?}", l)
2666 })?;
2667 let path_parts = path.path().split('/').collect::<Vec<_>>();
2668 if path_parts.get(1) != Some(&"namespaces") {
2669 bail!("Invalid path for nexus link: {:?}", l);
2670 }
2671 let namespace = path_parts.get(2).ok_or_else(|| {
2672 anyhow!("Failed to parse nexus link, no namespace: {:?}", l)
2673 })?;
2674 if path_parts.get(3) != Some(&"workflows") {
2675 bail!("Invalid path for nexus link, no workflows segment: {:?}", l);
2676 }
2677 let workflow_id = path_parts.get(4).ok_or_else(|| {
2678 anyhow!("Failed to parse nexus link, no workflow id: {:?}", l)
2679 })?;
2680 let run_id = path_parts
2681 .get(5)
2682 .ok_or_else(|| anyhow!("Failed to parse nexus link, no run id: {:?}", l))?;
2683 if path_parts.get(6) != Some(&"history") {
2684 bail!("Invalid path for nexus link, no history segment: {:?}", l);
2685 }
2686 let reference = if let Some(query) = path.query() {
2687 let mut eventref = workflow_event::EventReference::default();
2688 let query_parts = query.split('&').collect::<Vec<_>>();
2689 for qp in query_parts {
2690 let mut kv = qp.split('=');
2691 let key = kv.next().ok_or_else(|| {
2692 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2693 })?;
2694 let val = kv.next().ok_or_else(|| {
2695 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2696 })?;
2697 match key {
2698 "eventID" => {
2699 eventref.event_id = val.parse().map_err(|_| {
2700 anyhow!("Failed to parse nexus link event id: {:?}", l)
2701 })?;
2702 }
2703 "eventType" => {
2704 eventref.event_type = EventType::from_str_name(val)
2705 .unwrap_or_else(|| {
2706 EventType::from_str_name(
2707 &("EVENT_TYPE_".to_string()
2708 + &camel_case_to_screaming_snake(val)),
2709 )
2710 .unwrap_or_default()
2711 })
2712 .into()
2713 }
2714 _ => continue,
2715 }
2716 }
2717 Some(workflow_event::Reference::EventRef(eventref))
2718 } else {
2719 None
2720 };
2721
2722 Ok(common::v1::Link {
2723 variant: Some(common::v1::link::Variant::WorkflowEvent(WorkflowEvent {
2724 namespace: namespace.to_string(),
2725 workflow_id: workflow_id.to_string(),
2726 run_id: run_id.to_string(),
2727 reference,
2728 })),
2729 })
2730 }
2731
2732 impl TryFrom<failure::v1::Failure> for Failure {
2733 type Error = serde_json::Error;
2734
2735 fn try_from(mut f: failure::v1::Failure) -> Result<Self, Self::Error> {
2736 let message = std::mem::take(&mut f.message);
2738
2739 let details = serde_json::to_vec(&f)?;
2741
2742 Ok(Failure {
2744 message,
2745 stack_trace: f.stack_trace,
2746 metadata: HashMap::from([(
2747 "type".to_string(),
2748 failure::v1::Failure::full_name().into(),
2749 )]),
2750 details,
2751 cause: None,
2752 })
2753 }
2754 }
2755 }
2756 }
2757 pub mod workflowservice {
2758 pub mod v1 {
2759 use std::{
2760 convert::TryInto,
2761 fmt::{Display, Formatter},
2762 time::{Duration, SystemTime},
2763 };
2764
2765 tonic::include_proto!("temporal.api.workflowservice.v1");
2766
2767 macro_rules! sched_to_start_impl {
2768 ($sched_field:ident) => {
2769 pub fn sched_to_start(&self) -> Option<Duration> {
2772 if let Some((sch, st)) =
2773 self.$sched_field.clone().zip(self.started_time.clone())
2774 {
2775 if let Some(value) = elapsed_between_prost_times(sch, st) {
2776 return value;
2777 }
2778 }
2779 None
2780 }
2781 };
2782 }
2783
2784 fn elapsed_between_prost_times(
2785 from: prost_types::Timestamp,
2786 to: prost_types::Timestamp,
2787 ) -> Option<Option<Duration>> {
2788 let from: Result<SystemTime, _> = from.try_into();
2789 let to: Result<SystemTime, _> = to.try_into();
2790 if let (Ok(from), Ok(to)) = (from, to) {
2791 return Some(to.duration_since(from).ok());
2792 }
2793 None
2794 }
2795
2796 impl PollWorkflowTaskQueueResponse {
2797 sched_to_start_impl!(scheduled_time);
2798 }
2799
2800 impl Display for PollWorkflowTaskQueueResponse {
2801 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2802 let last_event = self
2803 .history
2804 .as_ref()
2805 .and_then(|h| h.events.last().map(|he| he.event_id))
2806 .unwrap_or(0);
2807 write!(
2808 f,
2809 "PollWFTQResp(run_id: {}, attempt: {}, last_event: {})",
2810 self.workflow_execution
2811 .as_ref()
2812 .map_or("", |we| we.run_id.as_str()),
2813 self.attempt,
2814 last_event
2815 )
2816 }
2817 }
2818
2819 pub struct CompactHist<'a>(pub &'a PollWorkflowTaskQueueResponse);
2821 impl Display for CompactHist<'_> {
2822 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2823 writeln!(
2824 f,
2825 "PollWorkflowTaskQueueResponse (prev_started: {}, started: {})",
2826 self.0.previous_started_event_id, self.0.started_event_id
2827 )?;
2828 if let Some(h) = self.0.history.as_ref() {
2829 for event in &h.events {
2830 writeln!(f, "{}", event)?;
2831 }
2832 }
2833 writeln!(f, "query: {:#?}", self.0.query)?;
2834 writeln!(f, "queries: {:#?}", self.0.queries)
2835 }
2836 }
2837
2838 impl PollActivityTaskQueueResponse {
2839 sched_to_start_impl!(current_attempt_scheduled_time);
2840 }
2841
2842 impl PollNexusTaskQueueResponse {
2843 pub fn sched_to_start(&self) -> Option<Duration> {
2844 if let Some((sch, st)) = self
2845 .request
2846 .as_ref()
2847 .and_then(|r| r.scheduled_time)
2848 .clone()
2849 .zip(SystemTime::now().try_into().ok())
2850 {
2851 if let Some(value) = elapsed_between_prost_times(sch, st) {
2852 return value;
2853 }
2854 }
2855 None
2856 }
2857 }
2858
2859 impl QueryWorkflowResponse {
2860 pub fn unwrap(self) -> Vec<crate::protos::temporal::api::common::v1::Payload> {
2862 self.query_result.unwrap().payloads
2863 }
2864 }
2865 }
2866 }
2867 }
2868}
2869
2870#[allow(
2871 clippy::all,
2872 missing_docs,
2873 rustdoc::broken_intra_doc_links,
2874 rustdoc::bare_urls
2875)]
2876pub mod google {
2877 pub mod rpc {
2878 tonic::include_proto!("google.rpc");
2879 }
2880}
2881
2882#[allow(
2883 clippy::all,
2884 missing_docs,
2885 rustdoc::broken_intra_doc_links,
2886 rustdoc::bare_urls
2887)]
2888pub mod grpc {
2889 pub mod health {
2890 pub mod v1 {
2891 tonic::include_proto!("grpc.health.v1");
2892 }
2893 }
2894}
2895
2896pub fn camel_case_to_screaming_snake(val: &str) -> String {
2898 let mut out = String::new();
2899 let mut last_was_upper = true;
2900 for c in val.chars() {
2901 if c.is_uppercase() {
2902 if !last_was_upper {
2903 out.push('_');
2904 }
2905 out.push(c.to_ascii_uppercase());
2906 last_was_upper = true;
2907 } else {
2908 out.push(c.to_ascii_uppercase());
2909 last_was_upper = false;
2910 }
2911 }
2912 out
2913}
2914
2915pub fn proto_ts_to_system_time(ts: &prost_types::Timestamp) -> Option<std::time::SystemTime> {
2917 std::time::SystemTime::UNIX_EPOCH
2918 .checked_add(Duration::from_secs(ts.seconds as u64) + Duration::from_nanos(ts.nanos as u64))
2919}
2920
2921#[cfg(test)]
2922mod tests {
2923 use crate::protos::temporal::api::failure::v1::Failure;
2924 use anyhow::anyhow;
2925
2926 #[test]
2927 fn anyhow_to_failure_conversion() {
2928 let no_causes: Failure = anyhow!("no causes").into();
2929 assert_eq!(no_causes.cause, None);
2930 assert_eq!(no_causes.message, "no causes");
2931 let orig = anyhow!("fail 1");
2932 let mid = orig.context("fail 2");
2933 let top = mid.context("fail 3");
2934 let as_fail: Failure = top.into();
2935 assert_eq!(as_fail.message, "fail 3");
2936 assert_eq!(as_fail.cause.as_ref().unwrap().message, "fail 2");
2937 assert_eq!(as_fail.cause.unwrap().cause.unwrap().message, "fail 1");
2938 }
2939}