1pub mod constants;
6pub mod utilities;
8
9#[cfg(feature = "test-utilities")]
10pub mod canned_histories;
12#[cfg(feature = "history_builders")]
13mod history_builder;
14#[cfg(feature = "history_builders")]
15mod history_info;
16mod task_token;
17#[cfg(feature = "test-utilities")]
18pub mod test_utils;
19
20use std::time::Duration;
21
22#[cfg(feature = "history_builders")]
23pub use history_builder::{
24 DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE, TestHistoryBuilder, default_act_sched,
25 default_wes_attribs,
26};
27#[cfg(feature = "history_builders")]
28pub use history_info::HistoryInfo;
29pub use task_token::TaskToken;
30
31pub static ENCODING_PAYLOAD_KEY: &str = "encoding";
33pub static JSON_ENCODING_VAL: &str = "json/plain";
35pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data";
37pub static VERSION_SEARCH_ATTR_KEY: &str = "TemporalChangeVersion";
39
40macro_rules! include_proto_with_serde {
41 ($pkg:tt) => {
42 tonic::include_proto!($pkg);
43
44 include!(concat!(env!("OUT_DIR"), concat!("/", $pkg, ".serde.rs")));
45 };
46}
47
48#[allow(
49 clippy::large_enum_variant,
50 clippy::derive_partial_eq_without_eq,
51 clippy::reserve_after_initialization
52)]
53#[allow(missing_docs)]
55pub mod coresdk {
56 tonic::include_proto!("coresdk");
59
60 use crate::protos::{
61 ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL,
62 temporal::api::{
63 common::v1::{Payload, Payloads, RetryPolicy, WorkflowExecution},
64 enums::v1::{
65 ApplicationErrorCategory, TimeoutType, VersioningBehavior, WorkflowTaskFailedCause,
66 },
67 failure::v1::{
68 ActivityFailureInfo, ApplicationFailureInfo, Failure, TimeoutFailureInfo,
69 failure::FailureInfo,
70 },
71 workflowservice::v1::PollActivityTaskQueueResponse,
72 },
73 };
74 use activity_task::ActivityTask;
75 use serde::{Deserialize, Serialize};
76 use std::{
77 collections::HashMap,
78 convert::TryFrom,
79 fmt::{Display, Formatter},
80 iter::FromIterator,
81 };
82 use workflow_activation::{WorkflowActivationJob, workflow_activation_job};
83 use workflow_commands::{WorkflowCommand, workflow_command, workflow_command::Variant};
84 use workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion};
85
86 #[allow(clippy::module_inception)]
87 pub mod activity_task {
88 use crate::protos::{coresdk::ActivityTaskCompletion, task_token::format_task_token};
89 use std::fmt::{Display, Formatter};
90 tonic::include_proto!("coresdk.activity_task");
91
92 impl ActivityTask {
93 pub fn cancel_from_ids(
94 task_token: Vec<u8>,
95 reason: ActivityCancelReason,
96 details: ActivityCancellationDetails,
97 ) -> Self {
98 Self {
99 task_token,
100 variant: Some(activity_task::Variant::Cancel(Cancel {
101 reason: reason as i32,
102 details: Some(details),
103 })),
104 }
105 }
106
107 pub fn is_timeout(&self) -> bool {
109 match &self.variant {
110 Some(activity_task::Variant::Cancel(Cancel { reason, details })) => {
111 *reason == ActivityCancelReason::TimedOut as i32
112 || details.as_ref().is_some_and(|d| d.is_timed_out)
113 }
114 _ => false,
115 }
116 }
117
118 pub fn primary_reason_to_cancellation_details(
119 reason: ActivityCancelReason,
120 ) -> ActivityCancellationDetails {
121 ActivityCancellationDetails {
122 is_not_found: reason == ActivityCancelReason::NotFound,
123 is_cancelled: reason == ActivityCancelReason::Cancelled,
124 is_paused: reason == ActivityCancelReason::Paused,
125 is_timed_out: reason == ActivityCancelReason::TimedOut,
126 is_worker_shutdown: reason == ActivityCancelReason::WorkerShutdown,
127 is_reset: reason == ActivityCancelReason::Reset,
128 }
129 }
130 }
131
132 impl Display for ActivityTaskCompletion {
133 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134 write!(
135 f,
136 "ActivityTaskCompletion(token: {}",
137 format_task_token(&self.task_token),
138 )?;
139 if let Some(r) = self.result.as_ref().and_then(|r| r.status.as_ref()) {
140 write!(f, ", {r}")?;
141 } else {
142 write!(f, ", missing result")?;
143 }
144 write!(f, ")")
145 }
146 }
147 }
148 #[allow(clippy::module_inception)]
149 pub mod activity_result {
150 tonic::include_proto!("coresdk.activity_result");
151 use super::super::temporal::api::{
152 common::v1::Payload,
153 failure::v1::{CanceledFailureInfo, Failure as APIFailure, failure},
154 };
155 use crate::protos::{
156 coresdk::activity_result::activity_resolution::Status,
157 temporal::api::enums::v1::TimeoutType,
158 };
159 use activity_execution_result as aer;
160 use anyhow::anyhow;
161 use std::fmt::{Display, Formatter};
162
163 impl ActivityExecutionResult {
164 pub const fn ok(result: Payload) -> Self {
165 Self {
166 status: Some(aer::Status::Completed(Success {
167 result: Some(result),
168 })),
169 }
170 }
171
172 pub fn fail(fail: APIFailure) -> Self {
173 Self {
174 status: Some(aer::Status::Failed(Failure {
175 failure: Some(fail),
176 })),
177 }
178 }
179
180 pub fn cancel_from_details(payload: Option<Payload>) -> Self {
181 Self {
182 status: Some(aer::Status::Cancelled(Cancellation::from_details(payload))),
183 }
184 }
185
186 pub const fn will_complete_async() -> Self {
187 Self {
188 status: Some(aer::Status::WillCompleteAsync(WillCompleteAsync {})),
189 }
190 }
191
192 pub fn is_cancelled(&self) -> bool {
193 matches!(self.status, Some(aer::Status::Cancelled(_)))
194 }
195 }
196
197 impl Display for aer::Status {
198 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199 write!(f, "ActivityExecutionResult(")?;
200 match self {
201 aer::Status::Completed(v) => {
202 write!(f, "{v})")
203 }
204 aer::Status::Failed(v) => {
205 write!(f, "{v})")
206 }
207 aer::Status::Cancelled(v) => {
208 write!(f, "{v})")
209 }
210 aer::Status::WillCompleteAsync(_) => {
211 write!(f, "Will complete async)")
212 }
213 }
214 }
215 }
216
217 impl Display for Success {
218 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
219 write!(f, "Success(")?;
220 if let Some(ref v) = self.result {
221 write!(f, "{v}")?;
222 }
223 write!(f, ")")
224 }
225 }
226
227 impl Display for Failure {
228 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
229 write!(f, "Failure(")?;
230 if let Some(ref v) = self.failure {
231 write!(f, "{v}")?;
232 }
233 write!(f, ")")
234 }
235 }
236
237 impl Display for Cancellation {
238 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
239 write!(f, "Cancellation(")?;
240 if let Some(ref v) = self.failure {
241 write!(f, "{v}")?;
242 }
243 write!(f, ")")
244 }
245 }
246
247 impl From<Result<Payload, APIFailure>> for ActivityExecutionResult {
248 fn from(r: Result<Payload, APIFailure>) -> Self {
249 Self {
250 status: match r {
251 Ok(p) => Some(aer::Status::Completed(Success { result: Some(p) })),
252 Err(f) => Some(aer::Status::Failed(Failure { failure: Some(f) })),
253 },
254 }
255 }
256 }
257
258 impl ActivityResolution {
259 pub fn success_payload_or_error(self) -> Result<Option<Payload>, anyhow::Error> {
262 let Some(status) = self.status else {
263 return Err(anyhow!("Activity completed without a status"));
264 };
265
266 match status {
267 activity_resolution::Status::Completed(success) => Ok(success.result),
268 e => Err(anyhow!("Activity was not successful: {e:?}")),
269 }
270 }
271
272 pub fn unwrap_ok_payload(self) -> Payload {
273 self.success_payload_or_error().unwrap().unwrap()
274 }
275
276 pub fn completed_ok(&self) -> bool {
277 matches!(self.status, Some(activity_resolution::Status::Completed(_)))
278 }
279
280 pub fn failed(&self) -> bool {
281 matches!(self.status, Some(activity_resolution::Status::Failed(_)))
282 }
283
284 pub fn timed_out(&self) -> Option<TimeoutType> {
285 match self.status {
286 Some(activity_resolution::Status::Failed(Failure {
287 failure: Some(ref f),
288 })) => f.is_timeout(),
289 _ => None,
290 }
291 }
292
293 pub fn cancelled(&self) -> bool {
294 matches!(self.status, Some(activity_resolution::Status::Cancelled(_)))
295 }
296
297 pub fn unwrap_failure(self) -> APIFailure {
300 match self.status.unwrap() {
301 Status::Failed(f) => f.failure.unwrap(),
302 Status::Cancelled(c) => c.failure.unwrap(),
303 _ => panic!("Actvity did not fail"),
304 }
305 }
306 }
307
308 impl Cancellation {
309 pub fn from_details(details: Option<Payload>) -> Self {
312 Cancellation {
313 failure: Some(APIFailure {
314 message: "Activity cancelled".to_string(),
315 failure_info: Some(failure::FailureInfo::CanceledFailureInfo(
316 CanceledFailureInfo {
317 details: details.map(Into::into),
318 },
319 )),
320 ..Default::default()
321 }),
322 }
323 }
324 }
325 }
326
327 pub mod common {
328 tonic::include_proto!("coresdk.common");
329 use super::external_data::LocalActivityMarkerData;
330 use crate::protos::{
331 PATCHED_MARKER_DETAILS_KEY,
332 coresdk::{
333 AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt,
334 external_data::PatchedMarkerData,
335 },
336 temporal::api::common::v1::{Payload, Payloads},
337 };
338 use std::collections::HashMap;
339
340 pub fn build_has_change_marker_details(
341 patch_id: impl Into<String>,
342 deprecated: bool,
343 ) -> anyhow::Result<HashMap<String, Payloads>> {
344 let mut hm = HashMap::new();
345 let encoded = PatchedMarkerData {
346 id: patch_id.into(),
347 deprecated,
348 }
349 .as_json_payload()?;
350 hm.insert(PATCHED_MARKER_DETAILS_KEY.to_string(), encoded.into());
351 Ok(hm)
352 }
353
354 pub fn decode_change_marker_details(
355 details: &HashMap<String, Payloads>,
356 ) -> Option<(String, bool)> {
357 if let Some(cd) = details.get(PATCHED_MARKER_DETAILS_KEY) {
360 let decoded = PatchedMarkerData::from_json_payload(cd.payloads.first()?).ok()?;
361 return Some((decoded.id, decoded.deprecated));
362 }
363
364 let id_entry = details.get("patch_id")?.payloads.first()?;
365 let deprecated_entry = details.get("deprecated")?.payloads.first()?;
366 let name = std::str::from_utf8(&id_entry.data).ok()?;
367 let deprecated = *deprecated_entry.data.first()? != 0;
368 Some((name.to_string(), deprecated))
369 }
370
371 pub fn build_local_activity_marker_details(
372 metadata: LocalActivityMarkerData,
373 result: Option<Payload>,
374 ) -> HashMap<String, Payloads> {
375 let mut hm = HashMap::new();
376 if let Some(jsonified) = metadata.as_json_payload().into_payloads() {
379 hm.insert("data".to_string(), jsonified);
380 }
381 if let Some(res) = result {
382 hm.insert("result".to_string(), res.into());
383 }
384 hm
385 }
386
387 pub fn extract_local_activity_marker_data(
390 details: &HashMap<String, Payloads>,
391 ) -> Option<LocalActivityMarkerData> {
392 details
393 .get("data")
394 .and_then(|p| p.payloads.first())
395 .and_then(|p| std::str::from_utf8(&p.data).ok())
396 .and_then(|s| serde_json::from_str(s).ok())
397 }
398
399 pub fn extract_local_activity_marker_details(
403 details: &mut HashMap<String, Payloads>,
404 ) -> (Option<LocalActivityMarkerData>, Option<Payload>) {
405 let data = extract_local_activity_marker_data(details);
406 let result = details.remove("result").and_then(|mut p| p.payloads.pop());
407 (data, result)
408 }
409 }
410
411 pub mod external_data {
412 use prost_types::{Duration, Timestamp};
413 use serde::{Deserialize, Deserializer, Serialize, Serializer};
414 tonic::include_proto!("coresdk.external_data");
415
416 #[derive(Serialize, Deserialize)]
420 #[serde(remote = "Timestamp")]
421 struct TimestampDef {
422 seconds: i64,
423 nanos: i32,
424 }
425 mod opt_timestamp {
426 use super::*;
427
428 pub(super) fn serialize<S>(
429 value: &Option<Timestamp>,
430 serializer: S,
431 ) -> Result<S::Ok, S::Error>
432 where
433 S: Serializer,
434 {
435 #[derive(Serialize)]
436 struct Helper<'a>(#[serde(with = "TimestampDef")] &'a Timestamp);
437
438 value.as_ref().map(Helper).serialize(serializer)
439 }
440
441 pub(super) fn deserialize<'de, D>(
442 deserializer: D,
443 ) -> Result<Option<Timestamp>, D::Error>
444 where
445 D: Deserializer<'de>,
446 {
447 #[derive(Deserialize)]
448 struct Helper(#[serde(with = "TimestampDef")] Timestamp);
449
450 let helper = Option::deserialize(deserializer)?;
451 Ok(helper.map(|Helper(external)| external))
452 }
453 }
454
455 #[derive(Serialize, Deserialize)]
457 #[serde(remote = "Duration")]
458 struct DurationDef {
459 seconds: i64,
460 nanos: i32,
461 }
462 mod opt_duration {
463 use super::*;
464
465 pub(super) fn serialize<S>(
466 value: &Option<Duration>,
467 serializer: S,
468 ) -> Result<S::Ok, S::Error>
469 where
470 S: Serializer,
471 {
472 #[derive(Serialize)]
473 struct Helper<'a>(#[serde(with = "DurationDef")] &'a Duration);
474
475 value.as_ref().map(Helper).serialize(serializer)
476 }
477
478 pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
479 where
480 D: Deserializer<'de>,
481 {
482 #[derive(Deserialize)]
483 struct Helper(#[serde(with = "DurationDef")] Duration);
484
485 let helper = Option::deserialize(deserializer)?;
486 Ok(helper.map(|Helper(external)| external))
487 }
488 }
489 }
490
491 pub mod workflow_activation {
492 use crate::protos::{
493 coresdk::{
494 FromPayloadsExt,
495 activity_result::{ActivityResolution, activity_resolution},
496 common::NamespacedWorkflowExecution,
497 fix_retry_policy,
498 workflow_activation::remove_from_cache::EvictionReason,
499 },
500 temporal::api::{
501 enums::v1::WorkflowTaskFailedCause,
502 history::v1::{
503 WorkflowExecutionCancelRequestedEventAttributes,
504 WorkflowExecutionSignaledEventAttributes,
505 WorkflowExecutionStartedEventAttributes,
506 },
507 query::v1::WorkflowQuery,
508 },
509 };
510 use prost_types::Timestamp;
511 use std::fmt::{Display, Formatter};
512
513 tonic::include_proto!("coresdk.workflow_activation");
514
515 pub fn create_evict_activation(
516 run_id: String,
517 message: String,
518 reason: EvictionReason,
519 ) -> WorkflowActivation {
520 WorkflowActivation {
521 timestamp: None,
522 run_id,
523 is_replaying: false,
524 history_length: 0,
525 jobs: vec![WorkflowActivationJob::from(
526 workflow_activation_job::Variant::RemoveFromCache(RemoveFromCache {
527 message,
528 reason: reason as i32,
529 }),
530 )],
531 available_internal_flags: vec![],
532 history_size_bytes: 0,
533 continue_as_new_suggested: false,
534 deployment_version_for_current_task: None,
535 last_sdk_version: String::new(),
536 suggest_continue_as_new_reasons: vec![],
537 target_worker_deployment_version_changed: false,
538 }
539 }
540
541 pub fn query_to_job(id: String, q: WorkflowQuery) -> QueryWorkflow {
542 QueryWorkflow {
543 query_id: id,
544 query_type: q.query_type,
545 arguments: Vec::from_payloads(q.query_args),
546 headers: q.header.map(|h| h.into()).unwrap_or_default(),
547 }
548 }
549
550 impl WorkflowActivation {
551 pub fn is_only_eviction(&self) -> bool {
553 matches!(
554 self.jobs.as_slice(),
555 [WorkflowActivationJob {
556 variant: Some(workflow_activation_job::Variant::RemoveFromCache(_))
557 }]
558 )
559 }
560
561 pub fn eviction_reason(&self) -> Option<EvictionReason> {
563 self.jobs.iter().find_map(|j| {
564 if let Some(workflow_activation_job::Variant::RemoveFromCache(ref rj)) =
565 j.variant
566 {
567 EvictionReason::try_from(rj.reason).ok()
568 } else {
569 None
570 }
571 })
572 }
573 }
574
575 impl workflow_activation_job::Variant {
576 pub fn is_local_activity_resolution(&self) -> bool {
577 matches!(self, workflow_activation_job::Variant::ResolveActivity(ra) if ra.is_local)
578 }
579 }
580
581 impl Display for EvictionReason {
582 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
583 write!(f, "{self:?}")
584 }
585 }
586
587 impl From<EvictionReason> for WorkflowTaskFailedCause {
588 fn from(value: EvictionReason) -> Self {
589 match value {
590 EvictionReason::Nondeterminism => {
591 WorkflowTaskFailedCause::NonDeterministicError
592 }
593 _ => WorkflowTaskFailedCause::Unspecified,
594 }
595 }
596 }
597
598 impl Display for WorkflowActivation {
599 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
600 write!(f, "WorkflowActivation(")?;
601 write!(f, "run_id: {}, ", self.run_id)?;
602 write!(f, "is_replaying: {}, ", self.is_replaying)?;
603 write!(
604 f,
605 "jobs: {})",
606 self.jobs
607 .iter()
608 .map(ToString::to_string)
609 .collect::<Vec<_>>()
610 .as_slice()
611 .join(", ")
612 )
613 }
614 }
615
616 impl Display for WorkflowActivationJob {
617 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
618 match &self.variant {
619 None => write!(f, "empty"),
620 Some(v) => write!(f, "{v}"),
621 }
622 }
623 }
624
625 impl Display for workflow_activation_job::Variant {
626 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
627 match self {
628 workflow_activation_job::Variant::InitializeWorkflow(_) => {
629 write!(f, "InitializeWorkflow")
630 }
631 workflow_activation_job::Variant::FireTimer(t) => {
632 write!(f, "FireTimer({})", t.seq)
633 }
634 workflow_activation_job::Variant::UpdateRandomSeed(_) => {
635 write!(f, "UpdateRandomSeed")
636 }
637 workflow_activation_job::Variant::QueryWorkflow(_) => {
638 write!(f, "QueryWorkflow")
639 }
640 workflow_activation_job::Variant::CancelWorkflow(_) => {
641 write!(f, "CancelWorkflow")
642 }
643 workflow_activation_job::Variant::SignalWorkflow(_) => {
644 write!(f, "SignalWorkflow")
645 }
646 workflow_activation_job::Variant::ResolveActivity(r) => {
647 write!(
648 f,
649 "ResolveActivity({}, {})",
650 r.seq,
651 r.result
652 .as_ref()
653 .unwrap_or(&ActivityResolution { status: None })
654 )
655 }
656 workflow_activation_job::Variant::NotifyHasPatch(_) => {
657 write!(f, "NotifyHasPatch")
658 }
659 workflow_activation_job::Variant::ResolveChildWorkflowExecutionStart(_) => {
660 write!(f, "ResolveChildWorkflowExecutionStart")
661 }
662 workflow_activation_job::Variant::ResolveChildWorkflowExecution(_) => {
663 write!(f, "ResolveChildWorkflowExecution")
664 }
665 workflow_activation_job::Variant::ResolveSignalExternalWorkflow(_) => {
666 write!(f, "ResolveSignalExternalWorkflow")
667 }
668 workflow_activation_job::Variant::RemoveFromCache(_) => {
669 write!(f, "RemoveFromCache")
670 }
671 workflow_activation_job::Variant::ResolveRequestCancelExternalWorkflow(_) => {
672 write!(f, "ResolveRequestCancelExternalWorkflow")
673 }
674 workflow_activation_job::Variant::DoUpdate(u) => {
675 write!(f, "DoUpdate({})", u.id)
676 }
677 workflow_activation_job::Variant::ResolveNexusOperationStart(_) => {
678 write!(f, "ResolveNexusOperationStart")
679 }
680 workflow_activation_job::Variant::ResolveNexusOperation(_) => {
681 write!(f, "ResolveNexusOperation")
682 }
683 }
684 }
685 }
686
687 impl Display for ActivityResolution {
688 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
689 match self.status {
690 None => {
691 write!(f, "None")
692 }
693 Some(activity_resolution::Status::Failed(_)) => {
694 write!(f, "Failed")
695 }
696 Some(activity_resolution::Status::Completed(_)) => {
697 write!(f, "Completed")
698 }
699 Some(activity_resolution::Status::Cancelled(_)) => {
700 write!(f, "Cancelled")
701 }
702 Some(activity_resolution::Status::Backoff(_)) => {
703 write!(f, "Backoff")
704 }
705 }
706 }
707 }
708
709 impl Display for QueryWorkflow {
710 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
711 write!(
712 f,
713 "QueryWorkflow(id: {}, type: {})",
714 self.query_id, self.query_type
715 )
716 }
717 }
718
719 impl From<WorkflowExecutionSignaledEventAttributes> for SignalWorkflow {
720 fn from(a: WorkflowExecutionSignaledEventAttributes) -> Self {
721 Self {
722 signal_name: a.signal_name,
723 input: Vec::from_payloads(a.input),
724 identity: a.identity,
725 headers: a.header.map(Into::into).unwrap_or_default(),
726 }
727 }
728 }
729
730 impl From<WorkflowExecutionCancelRequestedEventAttributes> for CancelWorkflow {
731 fn from(a: WorkflowExecutionCancelRequestedEventAttributes) -> Self {
732 Self { reason: a.cause }
733 }
734 }
735
736 pub fn start_workflow_from_attribs(
738 attrs: WorkflowExecutionStartedEventAttributes,
739 workflow_id: String,
740 randomness_seed: u64,
741 start_time: Timestamp,
742 ) -> InitializeWorkflow {
743 InitializeWorkflow {
744 workflow_type: attrs.workflow_type.map(|wt| wt.name).unwrap_or_default(),
745 workflow_id,
746 arguments: Vec::from_payloads(attrs.input),
747 randomness_seed,
748 headers: attrs.header.unwrap_or_default().fields,
749 identity: attrs.identity,
750 parent_workflow_info: attrs.parent_workflow_execution.map(|pe| {
751 NamespacedWorkflowExecution {
752 namespace: attrs.parent_workflow_namespace,
753 run_id: pe.run_id,
754 workflow_id: pe.workflow_id,
755 }
756 }),
757 workflow_execution_timeout: attrs.workflow_execution_timeout,
758 workflow_run_timeout: attrs.workflow_run_timeout,
759 workflow_task_timeout: attrs.workflow_task_timeout,
760 continued_from_execution_run_id: attrs.continued_execution_run_id,
761 continued_initiator: attrs.initiator,
762 continued_failure: attrs.continued_failure,
763 last_completion_result: attrs.last_completion_result,
764 first_execution_run_id: attrs.first_execution_run_id,
765 retry_policy: attrs.retry_policy.map(fix_retry_policy),
766 attempt: attrs.attempt,
767 cron_schedule: attrs.cron_schedule,
768 workflow_execution_expiration_time: attrs.workflow_execution_expiration_time,
769 cron_schedule_to_schedule_interval: attrs.first_workflow_task_backoff,
770 memo: attrs.memo,
771 search_attributes: attrs.search_attributes,
772 start_time: Some(start_time),
773 root_workflow: attrs.root_workflow_execution,
774 priority: attrs.priority,
775 }
776 }
777 }
778
779 pub mod workflow_completion {
780 use crate::protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure};
781 tonic::include_proto!("coresdk.workflow_completion");
782
783 impl workflow_activation_completion::Status {
784 pub const fn is_success(&self) -> bool {
785 match &self {
786 Self::Successful(_) => true,
787 Self::Failed(_) => false,
788 }
789 }
790 }
791
792 impl From<failure::v1::Failure> for Failure {
793 fn from(f: failure::v1::Failure) -> Self {
794 Failure {
795 failure: Some(f),
796 force_cause: WorkflowTaskFailedCause::Unspecified as i32,
797 }
798 }
799 }
800 }
801
802 pub mod child_workflow {
803 tonic::include_proto!("coresdk.child_workflow");
804 }
805
806 pub mod nexus {
807 use crate::protos::temporal::api::workflowservice::v1::PollNexusTaskQueueResponse;
808 use std::fmt::{Display, Formatter};
809
810 tonic::include_proto!("coresdk.nexus");
811
812 impl NexusTask {
813 pub fn unwrap_task(self) -> PollNexusTaskQueueResponse {
815 if let Some(nexus_task::Variant::Task(t)) = self.variant {
816 return t;
817 }
818 panic!("Nexus task did not contain a server task");
819 }
820
821 pub fn task_token(&self) -> &[u8] {
823 match &self.variant {
824 Some(nexus_task::Variant::Task(t)) => t.task_token.as_slice(),
825 Some(nexus_task::Variant::CancelTask(c)) => c.task_token.as_slice(),
826 None => panic!("Nexus task did not contain a task token"),
827 }
828 }
829 }
830
831 impl Display for nexus_task_completion::Status {
832 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
833 write!(f, "NexusTaskCompletion(")?;
834 match self {
835 nexus_task_completion::Status::Completed(c) => {
836 write!(f, "{c}")
837 }
838 nexus_task_completion::Status::AckCancel(_) => {
839 write!(f, "AckCancel")
840 }
841 #[allow(deprecated)]
842 nexus_task_completion::Status::Error(error) => {
843 write!(f, "Error({error:?})")
844 }
845 nexus_task_completion::Status::Failure(failure) => {
846 write!(f, "{failure}")
847 }
848 }?;
849 write!(f, ")")
850 }
851 }
852
853 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
854 pub enum NexusOperationErrorState {
855 Failed,
856 Canceled,
857 }
858
859 impl Display for NexusOperationErrorState {
860 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
861 match self {
862 Self::Failed => write!(f, "failed"),
863 Self::Canceled => write!(f, "canceled"),
864 }
865 }
866 }
867 }
868
869 pub mod workflow_commands {
870 tonic::include_proto!("coresdk.workflow_commands");
871
872 use crate::protos::temporal::api::{common::v1::Payloads, enums::v1::QueryResultType};
873 use std::fmt::{Display, Formatter};
874
875 impl Display for WorkflowCommand {
876 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
877 match &self.variant {
878 None => write!(f, "Empty"),
879 Some(v) => write!(f, "{v}"),
880 }
881 }
882 }
883
884 impl Display for StartTimer {
885 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
886 write!(f, "StartTimer({})", self.seq)
887 }
888 }
889
890 impl Display for ScheduleActivity {
891 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
892 write!(f, "ScheduleActivity({}, {})", self.seq, self.activity_type)
893 }
894 }
895
896 impl Display for ScheduleLocalActivity {
897 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898 write!(
899 f,
900 "ScheduleLocalActivity({}, {})",
901 self.seq, self.activity_type
902 )
903 }
904 }
905
906 impl Display for QueryResult {
907 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
908 write!(f, "RespondToQuery({})", self.query_id)
909 }
910 }
911
912 impl Display for RequestCancelActivity {
913 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
914 write!(f, "RequestCancelActivity({})", self.seq)
915 }
916 }
917
918 impl Display for RequestCancelLocalActivity {
919 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
920 write!(f, "RequestCancelLocalActivity({})", self.seq)
921 }
922 }
923
924 impl Display for CancelTimer {
925 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
926 write!(f, "CancelTimer({})", self.seq)
927 }
928 }
929
930 impl Display for CompleteWorkflowExecution {
931 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
932 write!(f, "CompleteWorkflowExecution")
933 }
934 }
935
936 impl Display for FailWorkflowExecution {
937 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
938 write!(f, "FailWorkflowExecution")
939 }
940 }
941
942 impl Display for ContinueAsNewWorkflowExecution {
943 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
944 write!(f, "ContinueAsNewWorkflowExecution")
945 }
946 }
947
948 impl Display for CancelWorkflowExecution {
949 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
950 write!(f, "CancelWorkflowExecution")
951 }
952 }
953
954 impl Display for SetPatchMarker {
955 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
956 write!(f, "SetPatchMarker({})", self.patch_id)
957 }
958 }
959
960 impl Display for StartChildWorkflowExecution {
961 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
962 write!(
963 f,
964 "StartChildWorkflowExecution({}, {})",
965 self.seq, self.workflow_type
966 )
967 }
968 }
969
970 impl Display for RequestCancelExternalWorkflowExecution {
971 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
972 write!(f, "RequestCancelExternalWorkflowExecution({})", self.seq)
973 }
974 }
975
976 impl Display for UpsertWorkflowSearchAttributes {
977 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
978 let keys: Vec<_> = self
979 .search_attributes
980 .as_ref()
981 .map(|sa| sa.indexed_fields.keys().collect())
982 .unwrap_or_default();
983 write!(f, "UpsertWorkflowSearchAttributes({:?})", keys)
984 }
985 }
986
987 impl Display for SignalExternalWorkflowExecution {
988 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
989 write!(f, "SignalExternalWorkflowExecution({})", self.seq)
990 }
991 }
992
993 impl Display for CancelSignalWorkflow {
994 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
995 write!(f, "CancelSignalWorkflow({})", self.seq)
996 }
997 }
998
999 impl Display for CancelChildWorkflowExecution {
1000 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1001 write!(
1002 f,
1003 "CancelChildWorkflowExecution({})",
1004 self.child_workflow_seq
1005 )
1006 }
1007 }
1008
1009 impl Display for ModifyWorkflowProperties {
1010 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1011 write!(
1012 f,
1013 "ModifyWorkflowProperties(upserted memo keys: {:?})",
1014 self.upserted_memo.as_ref().map(|m| m.fields.keys())
1015 )
1016 }
1017 }
1018
1019 impl Display for UpdateResponse {
1020 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1021 write!(
1022 f,
1023 "UpdateResponse(protocol_instance_id: {}, response: {:?})",
1024 self.protocol_instance_id, self.response
1025 )
1026 }
1027 }
1028
1029 impl Display for ScheduleNexusOperation {
1030 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1031 write!(f, "ScheduleNexusOperation({})", self.seq)
1032 }
1033 }
1034
1035 impl Display for RequestCancelNexusOperation {
1036 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1037 write!(f, "RequestCancelNexusOperation({})", self.seq)
1038 }
1039 }
1040
1041 impl QueryResult {
1042 pub fn into_components(self) -> (String, QueryResultType, Option<Payloads>, String) {
1044 match self {
1045 QueryResult {
1046 variant: Some(query_result::Variant::Succeeded(qs)),
1047 query_id,
1048 } => (
1049 query_id,
1050 QueryResultType::Answered,
1051 qs.response.map(Into::into),
1052 "".to_string(),
1053 ),
1054 QueryResult {
1055 variant: Some(query_result::Variant::Failed(err)),
1056 query_id,
1057 } => (query_id, QueryResultType::Failed, None, err.message),
1058 QueryResult {
1059 variant: None,
1060 query_id,
1061 } => (
1062 query_id,
1063 QueryResultType::Failed,
1064 None,
1065 "Query response was empty".to_string(),
1066 ),
1067 }
1068 }
1069 }
1070 }
1071
1072 pub type HistoryEventId = i64;
1073
1074 impl From<workflow_activation_job::Variant> for WorkflowActivationJob {
1075 fn from(a: workflow_activation_job::Variant) -> Self {
1076 Self { variant: Some(a) }
1077 }
1078 }
1079
1080 impl From<Vec<WorkflowCommand>> for workflow_completion::Success {
1081 fn from(v: Vec<WorkflowCommand>) -> Self {
1082 Self {
1083 commands: v,
1084 used_internal_flags: vec![],
1085 versioning_behavior: VersioningBehavior::Unspecified.into(),
1086 }
1087 }
1088 }
1089
1090 impl From<workflow_command::Variant> for WorkflowCommand {
1091 fn from(v: workflow_command::Variant) -> Self {
1092 Self {
1093 variant: Some(v),
1094 user_metadata: None,
1095 }
1096 }
1097 }
1098
1099 impl workflow_completion::Success {
1100 pub fn from_variants(cmds: Vec<Variant>) -> Self {
1101 let cmds: Vec<_> = cmds.into_iter().map(|c| c.into()).collect();
1102 cmds.into()
1103 }
1104 }
1105
1106 impl WorkflowActivationCompletion {
1107 pub fn empty(run_id: impl Into<String>) -> Self {
1109 let success = workflow_completion::Success::from_variants(vec![]);
1110 Self {
1111 run_id: run_id.into(),
1112 status: Some(workflow_activation_completion::Status::Successful(success)),
1113 }
1114 }
1115
1116 pub fn from_cmds(run_id: impl Into<String>, cmds: Vec<workflow_command::Variant>) -> Self {
1118 let success = workflow_completion::Success::from_variants(cmds);
1119 Self {
1120 run_id: run_id.into(),
1121 status: Some(workflow_activation_completion::Status::Successful(success)),
1122 }
1123 }
1124
1125 pub fn from_cmd(run_id: impl Into<String>, cmd: workflow_command::Variant) -> Self {
1127 let success = workflow_completion::Success::from_variants(vec![cmd]);
1128 Self {
1129 run_id: run_id.into(),
1130 status: Some(workflow_activation_completion::Status::Successful(success)),
1131 }
1132 }
1133
1134 pub fn fail(
1135 run_id: impl Into<String>,
1136 failure: Failure,
1137 cause: Option<WorkflowTaskFailedCause>,
1138 ) -> Self {
1139 Self {
1140 run_id: run_id.into(),
1141 status: Some(workflow_activation_completion::Status::Failed(
1142 workflow_completion::Failure {
1143 failure: Some(failure),
1144 force_cause: cause.unwrap_or(WorkflowTaskFailedCause::Unspecified) as i32,
1145 },
1146 )),
1147 }
1148 }
1149
1150 pub fn has_execution_ending(&self) -> bool {
1153 self.has_complete_workflow_execution()
1154 || self.has_fail_execution()
1155 || self.has_continue_as_new()
1156 || self.has_cancel_workflow_execution()
1157 }
1158
1159 pub fn has_fail_execution(&self) -> bool {
1161 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1162 return s.commands.iter().any(|wfc| {
1163 matches!(
1164 wfc,
1165 WorkflowCommand {
1166 variant: Some(workflow_command::Variant::FailWorkflowExecution(_)),
1167 ..
1168 }
1169 )
1170 });
1171 }
1172 false
1173 }
1174
1175 pub fn has_cancel_workflow_execution(&self) -> bool {
1177 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1178 return s.commands.iter().any(|wfc| {
1179 matches!(
1180 wfc,
1181 WorkflowCommand {
1182 variant: Some(workflow_command::Variant::CancelWorkflowExecution(_)),
1183 ..
1184 }
1185 )
1186 });
1187 }
1188 false
1189 }
1190
1191 pub fn has_continue_as_new(&self) -> bool {
1193 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1194 return s.commands.iter().any(|wfc| {
1195 matches!(
1196 wfc,
1197 WorkflowCommand {
1198 variant: Some(
1199 workflow_command::Variant::ContinueAsNewWorkflowExecution(_)
1200 ),
1201 ..
1202 }
1203 )
1204 });
1205 }
1206 false
1207 }
1208
1209 pub fn has_complete_workflow_execution(&self) -> bool {
1211 self.complete_workflow_execution_value().is_some()
1212 }
1213
1214 pub fn complete_workflow_execution_value(&self) -> Option<&Payload> {
1216 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1217 s.commands.iter().find_map(|wfc| match wfc {
1218 WorkflowCommand {
1219 variant: Some(workflow_command::Variant::CompleteWorkflowExecution(v)),
1220 ..
1221 } => v.result.as_ref(),
1222 _ => None,
1223 })
1224 } else {
1225 None
1226 }
1227 }
1228
1229 pub fn is_empty(&self) -> bool {
1231 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1232 return s.commands.is_empty();
1233 }
1234 false
1235 }
1236
1237 pub fn add_internal_flags(&mut self, patch: u32) {
1238 if let Some(workflow_activation_completion::Status::Successful(s)) = &mut self.status {
1239 s.used_internal_flags.push(patch);
1240 }
1241 }
1242 }
1243
1244 pub trait IntoCompletion {
1246 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion;
1248 }
1249
1250 impl IntoCompletion for workflow_command::Variant {
1251 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1252 WorkflowActivationCompletion::from_cmd(run_id, self)
1253 }
1254 }
1255
1256 impl<I, V> IntoCompletion for I
1257 where
1258 I: IntoIterator<Item = V>,
1259 V: Into<WorkflowCommand>,
1260 {
1261 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1262 let success = self.into_iter().map(Into::into).collect::<Vec<_>>().into();
1263 WorkflowActivationCompletion {
1264 run_id,
1265 status: Some(workflow_activation_completion::Status::Successful(success)),
1266 }
1267 }
1268 }
1269
1270 impl Display for WorkflowActivationCompletion {
1271 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1272 write!(
1273 f,
1274 "WorkflowActivationCompletion(run_id: {}, status: ",
1275 &self.run_id
1276 )?;
1277 match &self.status {
1278 None => write!(f, "empty")?,
1279 Some(s) => write!(f, "{s}")?,
1280 };
1281 write!(f, ")")
1282 }
1283 }
1284
1285 impl Display for workflow_activation_completion::Status {
1286 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1287 match self {
1288 workflow_activation_completion::Status::Successful(
1289 workflow_completion::Success { commands, .. },
1290 ) => {
1291 write!(f, "Success(")?;
1292 let mut written = 0;
1293 for c in commands {
1294 write!(f, "{c} ")?;
1295 written += 1;
1296 if written >= 10 && written < commands.len() {
1297 write!(f, "... {} more", commands.len() - written)?;
1298 break;
1299 }
1300 }
1301 write!(f, ")")
1302 }
1303 workflow_activation_completion::Status::Failed(_) => {
1304 write!(f, "Failed")
1305 }
1306 }
1307 }
1308 }
1309
1310 impl ActivityTask {
1311 pub fn start_from_poll_resp(r: PollActivityTaskQueueResponse) -> Self {
1312 let (workflow_id, run_id) = r
1313 .workflow_execution
1314 .map(|we| (we.workflow_id, we.run_id))
1315 .unwrap_or_default();
1316 Self {
1317 task_token: r.task_token,
1318 variant: Some(activity_task::activity_task::Variant::Start(
1319 activity_task::Start {
1320 workflow_namespace: r.workflow_namespace,
1321 workflow_type: r.workflow_type.map_or_else(|| "".to_string(), |wt| wt.name),
1322 workflow_execution: Some(WorkflowExecution {
1323 workflow_id,
1324 run_id,
1325 }),
1326 activity_id: r.activity_id,
1327 activity_type: r.activity_type.map_or_else(|| "".to_string(), |at| at.name),
1328 header_fields: r.header.map(Into::into).unwrap_or_default(),
1329 input: Vec::from_payloads(r.input),
1330 heartbeat_details: Vec::from_payloads(r.heartbeat_details),
1331 scheduled_time: r.scheduled_time,
1332 current_attempt_scheduled_time: r.current_attempt_scheduled_time,
1333 started_time: r.started_time,
1334 attempt: r.attempt as u32,
1335 schedule_to_close_timeout: r.schedule_to_close_timeout,
1336 start_to_close_timeout: r.start_to_close_timeout,
1337 heartbeat_timeout: r.heartbeat_timeout,
1338 retry_policy: r.retry_policy.map(fix_retry_policy),
1339 priority: r.priority,
1340 is_local: false,
1341 },
1342 )),
1343 }
1344 }
1345 }
1346
1347 impl Failure {
1348 pub fn is_timeout(&self) -> Option<crate::protos::temporal::api::enums::v1::TimeoutType> {
1349 match &self.failure_info {
1350 Some(FailureInfo::TimeoutFailureInfo(ti)) => Some(ti.timeout_type()),
1351 _ => {
1352 if let Some(c) = &self.cause {
1353 c.is_timeout()
1354 } else {
1355 None
1356 }
1357 }
1358 }
1359 }
1360
1361 pub fn application_failure(message: String, non_retryable: bool) -> Self {
1362 Self {
1363 message,
1364 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1365 ApplicationFailureInfo {
1366 non_retryable,
1367 ..Default::default()
1368 },
1369 )),
1370 ..Default::default()
1371 }
1372 }
1373
1374 pub fn application_failure_from_error(ae: anyhow::Error, non_retryable: bool) -> Self {
1375 Self {
1376 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1377 ApplicationFailureInfo {
1378 non_retryable,
1379 ..Default::default()
1380 },
1381 )),
1382 ..ae.chain()
1383 .rfold(None, |cause, e| {
1384 Some(Self {
1385 message: e.to_string(),
1386 cause: cause.map(Box::new),
1387 ..Default::default()
1388 })
1389 })
1390 .unwrap_or_default()
1391 }
1392 }
1393
1394 pub fn timeout(timeout_type: TimeoutType) -> Self {
1395 Self {
1396 message: "Activity timed out".to_string(),
1397 cause: Some(Box::new(Failure {
1398 message: "Activity timed out".to_string(),
1399 failure_info: Some(FailureInfo::TimeoutFailureInfo(TimeoutFailureInfo {
1400 timeout_type: timeout_type.into(),
1401 ..Default::default()
1402 })),
1403 ..Default::default()
1404 })),
1405 failure_info: Some(FailureInfo::ActivityFailureInfo(
1406 ActivityFailureInfo::default(),
1407 )),
1408 ..Default::default()
1409 }
1410 }
1411
1412 pub fn maybe_application_failure(&self) -> Option<&ApplicationFailureInfo> {
1414 if let Failure {
1415 failure_info: Some(FailureInfo::ApplicationFailureInfo(f)),
1416 ..
1417 } = self
1418 {
1419 Some(f)
1420 } else {
1421 None
1422 }
1423 }
1424
1425 pub fn is_benign_application_failure(&self) -> bool {
1427 self.maybe_application_failure()
1428 .is_some_and(|app_info| app_info.category() == ApplicationErrorCategory::Benign)
1429 }
1430 }
1431
1432 impl Display for Failure {
1433 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1434 write!(f, "Failure({}, ", self.message)?;
1435 match self.failure_info.as_ref() {
1436 None => write!(f, "missing info")?,
1437 Some(FailureInfo::TimeoutFailureInfo(v)) => {
1438 write!(f, "Timeout: {:?}", v.timeout_type())?;
1439 }
1440 Some(FailureInfo::ApplicationFailureInfo(v)) => {
1441 write!(f, "Application Failure: {}", v.r#type)?;
1442 }
1443 Some(FailureInfo::CanceledFailureInfo(_)) => {
1444 write!(f, "Cancelled")?;
1445 }
1446 Some(FailureInfo::TerminatedFailureInfo(_)) => {
1447 write!(f, "Terminated")?;
1448 }
1449 Some(FailureInfo::ServerFailureInfo(_)) => {
1450 write!(f, "Server Failure")?;
1451 }
1452 Some(FailureInfo::ResetWorkflowFailureInfo(_)) => {
1453 write!(f, "Reset Workflow")?;
1454 }
1455 Some(FailureInfo::ActivityFailureInfo(v)) => {
1456 write!(
1457 f,
1458 "Activity Failure: scheduled_event_id: {}",
1459 v.scheduled_event_id
1460 )?;
1461 }
1462 Some(FailureInfo::ChildWorkflowExecutionFailureInfo(v)) => {
1463 write!(
1464 f,
1465 "Child Workflow: started_event_id: {}",
1466 v.started_event_id
1467 )?;
1468 }
1469 Some(FailureInfo::NexusOperationExecutionFailureInfo(v)) => {
1470 write!(
1471 f,
1472 "Nexus Operation Failure: scheduled_event_id: {}",
1473 v.scheduled_event_id
1474 )?;
1475 }
1476 Some(FailureInfo::NexusHandlerFailureInfo(v)) => {
1477 write!(f, "Nexus Handler Failure: {}", v.r#type)?;
1478 }
1479 }
1480 write!(f, ")")
1481 }
1482 }
1483
1484 impl From<&str> for Failure {
1485 fn from(v: &str) -> Self {
1486 Failure::application_failure(v.to_string(), false)
1487 }
1488 }
1489
1490 impl From<String> for Failure {
1491 fn from(v: String) -> Self {
1492 Failure::application_failure(v, false)
1493 }
1494 }
1495
1496 impl From<anyhow::Error> for Failure {
1497 fn from(ae: anyhow::Error) -> Self {
1498 Failure::application_failure_from_error(ae, false)
1499 }
1500 }
1501
1502 pub trait FromPayloadsExt {
1503 fn from_payloads(p: Option<Payloads>) -> Self;
1504 }
1505 impl<T> FromPayloadsExt for T
1506 where
1507 T: FromIterator<Payload>,
1508 {
1509 fn from_payloads(p: Option<Payloads>) -> Self {
1510 match p {
1511 None => std::iter::empty().collect(),
1512 Some(p) => p.payloads.into_iter().collect(),
1513 }
1514 }
1515 }
1516
1517 pub trait IntoPayloadsExt {
1518 fn into_payloads(self) -> Option<Payloads>;
1519 }
1520 impl<T> IntoPayloadsExt for T
1521 where
1522 T: IntoIterator<Item = Payload>,
1523 {
1524 fn into_payloads(self) -> Option<Payloads> {
1525 let mut iterd = self.into_iter().peekable();
1526 if iterd.peek().is_none() {
1527 None
1528 } else {
1529 Some(Payloads {
1530 payloads: iterd.collect(),
1531 })
1532 }
1533 }
1534 }
1535
1536 impl From<Payload> for Payloads {
1537 fn from(p: Payload) -> Self {
1538 Self { payloads: vec![p] }
1539 }
1540 }
1541
1542 impl<T> From<T> for Payloads
1543 where
1544 T: AsRef<[u8]>,
1545 {
1546 fn from(v: T) -> Self {
1547 Self {
1548 payloads: vec![v.into()],
1549 }
1550 }
1551 }
1552
1553 #[derive(thiserror::Error, Debug)]
1554 pub enum PayloadDeserializeErr {
1555 #[error("This deserializer does not understand this payload")]
1558 DeserializerDoesNotHandle,
1559 #[error("Error during deserialization: {0}")]
1560 DeserializeErr(#[from] anyhow::Error),
1561 }
1562
1563 pub trait AsJsonPayloadExt {
1566 fn as_json_payload(&self) -> anyhow::Result<Payload>;
1567 }
1568 impl<T> AsJsonPayloadExt for T
1569 where
1570 T: Serialize,
1571 {
1572 fn as_json_payload(&self) -> anyhow::Result<Payload> {
1573 let as_json = serde_json::to_string(self)?;
1574 let mut metadata = HashMap::new();
1575 metadata.insert(
1576 ENCODING_PAYLOAD_KEY.to_string(),
1577 JSON_ENCODING_VAL.as_bytes().to_vec(),
1578 );
1579 Ok(Payload {
1580 metadata,
1581 data: as_json.into_bytes(),
1582 external_payloads: Default::default(),
1583 })
1584 }
1585 }
1586
1587 pub trait FromJsonPayloadExt: Sized {
1588 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr>;
1589 }
1590 impl<T> FromJsonPayloadExt for T
1591 where
1592 T: for<'de> Deserialize<'de>,
1593 {
1594 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr> {
1595 if !payload.is_json_payload() {
1596 return Err(PayloadDeserializeErr::DeserializerDoesNotHandle);
1597 }
1598 let payload_str = std::str::from_utf8(&payload.data).map_err(anyhow::Error::from)?;
1599 Ok(serde_json::from_str(payload_str).map_err(anyhow::Error::from)?)
1600 }
1601 }
1602
1603 #[derive(derive_more::Display, Debug)]
1605 pub enum PayloadsToPayloadError {
1606 MoreThanOnePayload,
1607 NoPayload,
1608 }
1609 impl TryFrom<Payloads> for Payload {
1610 type Error = PayloadsToPayloadError;
1611
1612 fn try_from(mut v: Payloads) -> Result<Self, Self::Error> {
1613 match v.payloads.pop() {
1614 None => Err(PayloadsToPayloadError::NoPayload),
1615 Some(p) => {
1616 if v.payloads.is_empty() {
1617 Ok(p)
1618 } else {
1619 Err(PayloadsToPayloadError::MoreThanOnePayload)
1620 }
1621 }
1622 }
1623 }
1624 }
1625
1626 fn fix_retry_policy(mut retry_policy: RetryPolicy) -> RetryPolicy {
1629 if retry_policy.initial_interval.is_none() {
1630 retry_policy.initial_interval = Default::default();
1631 }
1632 retry_policy
1633 }
1634}
1635
1636#[allow(
1638 clippy::all,
1639 missing_docs,
1640 rustdoc::broken_intra_doc_links,
1641 rustdoc::bare_urls
1642)]
1643pub mod temporal {
1645 pub mod api {
1646 pub mod activity {
1647 pub mod v1 {
1648 tonic::include_proto!("temporal.api.activity.v1");
1649 }
1650 }
1651 pub mod batch {
1652 pub mod v1 {
1653 tonic::include_proto!("temporal.api.batch.v1");
1654 }
1655 }
1656 pub mod command {
1657 pub mod v1 {
1658 tonic::include_proto!("temporal.api.command.v1");
1659
1660 use crate::protos::{
1661 coresdk::{IntoPayloadsExt, workflow_commands},
1662 temporal::api::{
1663 common::v1::{ActivityType, WorkflowType},
1664 enums::v1::CommandType,
1665 },
1666 };
1667 use command::Attributes;
1668 use std::fmt::{Display, Formatter};
1669
1670 impl From<command::Attributes> for Command {
1671 fn from(c: command::Attributes) -> Self {
1672 match c {
1673 a @ Attributes::StartTimerCommandAttributes(_) => Self {
1674 command_type: CommandType::StartTimer as i32,
1675 attributes: Some(a),
1676 user_metadata: Default::default(),
1677 },
1678 a @ Attributes::CancelTimerCommandAttributes(_) => Self {
1679 command_type: CommandType::CancelTimer as i32,
1680 attributes: Some(a),
1681 user_metadata: Default::default(),
1682 },
1683 a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => Self {
1684 command_type: CommandType::CompleteWorkflowExecution as i32,
1685 attributes: Some(a),
1686 user_metadata: Default::default(),
1687 },
1688 a @ Attributes::FailWorkflowExecutionCommandAttributes(_) => Self {
1689 command_type: CommandType::FailWorkflowExecution as i32,
1690 attributes: Some(a),
1691 user_metadata: Default::default(),
1692 },
1693 a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self {
1694 command_type: CommandType::ScheduleActivityTask as i32,
1695 attributes: Some(a),
1696 user_metadata: Default::default(),
1697 },
1698 a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => Self {
1699 command_type: CommandType::RequestCancelActivityTask as i32,
1700 attributes: Some(a),
1701 user_metadata: Default::default(),
1702 },
1703 a @ Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1704 Self {
1705 command_type: CommandType::ContinueAsNewWorkflowExecution
1706 as i32,
1707 attributes: Some(a),
1708 user_metadata: Default::default(),
1709 }
1710 }
1711 a @ Attributes::CancelWorkflowExecutionCommandAttributes(_) => Self {
1712 command_type: CommandType::CancelWorkflowExecution as i32,
1713 attributes: Some(a),
1714 user_metadata: Default::default(),
1715 },
1716 a @ Attributes::RecordMarkerCommandAttributes(_) => Self {
1717 command_type: CommandType::RecordMarker as i32,
1718 attributes: Some(a),
1719 user_metadata: Default::default(),
1720 },
1721 a @ Attributes::ProtocolMessageCommandAttributes(_) => Self {
1722 command_type: CommandType::ProtocolMessage as i32,
1723 attributes: Some(a),
1724 user_metadata: Default::default(),
1725 },
1726 a @ Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1727 Self {
1728 command_type: CommandType::RequestCancelNexusOperation as i32,
1729 attributes: Some(a),
1730 user_metadata: Default::default(),
1731 }
1732 }
1733 _ => unimplemented!(),
1734 }
1735 }
1736 }
1737
1738 impl Display for Command {
1739 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1740 let ct = CommandType::try_from(self.command_type)
1741 .unwrap_or(CommandType::Unspecified);
1742 write!(f, "{:?}", ct)
1743 }
1744 }
1745
1746 pub trait CommandAttributesExt {
1747 fn as_type(&self) -> CommandType;
1748 }
1749
1750 impl CommandAttributesExt for command::Attributes {
1751 fn as_type(&self) -> CommandType {
1752 match self {
1753 Attributes::ScheduleActivityTaskCommandAttributes(_) => {
1754 CommandType::ScheduleActivityTask
1755 }
1756 Attributes::StartTimerCommandAttributes(_) => CommandType::StartTimer,
1757 Attributes::CompleteWorkflowExecutionCommandAttributes(_) => {
1758 CommandType::CompleteWorkflowExecution
1759 }
1760 Attributes::FailWorkflowExecutionCommandAttributes(_) => {
1761 CommandType::FailWorkflowExecution
1762 }
1763 Attributes::RequestCancelActivityTaskCommandAttributes(_) => {
1764 CommandType::RequestCancelActivityTask
1765 }
1766 Attributes::CancelTimerCommandAttributes(_) => CommandType::CancelTimer,
1767 Attributes::CancelWorkflowExecutionCommandAttributes(_) => {
1768 CommandType::CancelWorkflowExecution
1769 }
1770 Attributes::RequestCancelExternalWorkflowExecutionCommandAttributes(
1771 _,
1772 ) => CommandType::RequestCancelExternalWorkflowExecution,
1773 Attributes::RecordMarkerCommandAttributes(_) => {
1774 CommandType::RecordMarker
1775 }
1776 Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1777 CommandType::ContinueAsNewWorkflowExecution
1778 }
1779 Attributes::StartChildWorkflowExecutionCommandAttributes(_) => {
1780 CommandType::StartChildWorkflowExecution
1781 }
1782 Attributes::SignalExternalWorkflowExecutionCommandAttributes(_) => {
1783 CommandType::SignalExternalWorkflowExecution
1784 }
1785 Attributes::UpsertWorkflowSearchAttributesCommandAttributes(_) => {
1786 CommandType::UpsertWorkflowSearchAttributes
1787 }
1788 Attributes::ProtocolMessageCommandAttributes(_) => {
1789 CommandType::ProtocolMessage
1790 }
1791 Attributes::ModifyWorkflowPropertiesCommandAttributes(_) => {
1792 CommandType::ModifyWorkflowProperties
1793 }
1794 Attributes::ScheduleNexusOperationCommandAttributes(_) => {
1795 CommandType::ScheduleNexusOperation
1796 }
1797 Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1798 CommandType::RequestCancelNexusOperation
1799 }
1800 }
1801 }
1802 }
1803
1804 impl Display for command::Attributes {
1805 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1806 write!(f, "{:?}", self.as_type())
1807 }
1808 }
1809
1810 impl From<workflow_commands::StartTimer> for command::Attributes {
1811 fn from(s: workflow_commands::StartTimer) -> Self {
1812 Self::StartTimerCommandAttributes(StartTimerCommandAttributes {
1813 timer_id: s.seq.to_string(),
1814 start_to_fire_timeout: s.start_to_fire_timeout,
1815 })
1816 }
1817 }
1818
1819 impl From<workflow_commands::UpsertWorkflowSearchAttributes> for command::Attributes {
1820 fn from(s: workflow_commands::UpsertWorkflowSearchAttributes) -> Self {
1821 Self::UpsertWorkflowSearchAttributesCommandAttributes(
1822 UpsertWorkflowSearchAttributesCommandAttributes {
1823 search_attributes: s.search_attributes,
1824 },
1825 )
1826 }
1827 }
1828
1829 impl From<workflow_commands::ModifyWorkflowProperties> for command::Attributes {
1830 fn from(s: workflow_commands::ModifyWorkflowProperties) -> Self {
1831 Self::ModifyWorkflowPropertiesCommandAttributes(
1832 ModifyWorkflowPropertiesCommandAttributes {
1833 upserted_memo: s.upserted_memo.map(Into::into),
1834 },
1835 )
1836 }
1837 }
1838
1839 impl From<workflow_commands::CancelTimer> for command::Attributes {
1840 fn from(s: workflow_commands::CancelTimer) -> Self {
1841 Self::CancelTimerCommandAttributes(CancelTimerCommandAttributes {
1842 timer_id: s.seq.to_string(),
1843 })
1844 }
1845 }
1846
1847 pub fn schedule_activity_cmd_to_api(
1848 s: workflow_commands::ScheduleActivity,
1849 use_workflow_build_id: bool,
1850 ) -> command::Attributes {
1851 command::Attributes::ScheduleActivityTaskCommandAttributes(
1852 ScheduleActivityTaskCommandAttributes {
1853 activity_id: s.activity_id,
1854 activity_type: Some(ActivityType {
1855 name: s.activity_type,
1856 }),
1857 task_queue: Some(s.task_queue.into()),
1858 header: Some(s.headers.into()),
1859 input: s.arguments.into_payloads(),
1860 schedule_to_close_timeout: s.schedule_to_close_timeout,
1861 schedule_to_start_timeout: s.schedule_to_start_timeout,
1862 start_to_close_timeout: s.start_to_close_timeout,
1863 heartbeat_timeout: s.heartbeat_timeout,
1864 retry_policy: s.retry_policy.map(Into::into),
1865 request_eager_execution: !s.do_not_eagerly_execute,
1866 use_workflow_build_id,
1867 priority: s.priority,
1868 },
1869 )
1870 }
1871
1872 #[allow(deprecated)]
1873 pub fn start_child_workflow_cmd_to_api(
1874 s: workflow_commands::StartChildWorkflowExecution,
1875 inherit_build_id: bool,
1876 ) -> command::Attributes {
1877 command::Attributes::StartChildWorkflowExecutionCommandAttributes(
1878 StartChildWorkflowExecutionCommandAttributes {
1879 workflow_id: s.workflow_id,
1880 workflow_type: Some(WorkflowType {
1881 name: s.workflow_type,
1882 }),
1883 control: "".into(),
1884 namespace: s.namespace,
1885 task_queue: Some(s.task_queue.into()),
1886 header: Some(s.headers.into()),
1887 memo: Some(s.memo.into()),
1888 search_attributes: s.search_attributes,
1889 input: s.input.into_payloads(),
1890 workflow_id_reuse_policy: s.workflow_id_reuse_policy,
1891 workflow_execution_timeout: s.workflow_execution_timeout,
1892 workflow_run_timeout: s.workflow_run_timeout,
1893 workflow_task_timeout: s.workflow_task_timeout,
1894 retry_policy: s.retry_policy.map(Into::into),
1895 cron_schedule: s.cron_schedule.clone(),
1896 parent_close_policy: s.parent_close_policy,
1897 inherit_build_id,
1898 priority: s.priority,
1899 },
1900 )
1901 }
1902
1903 impl From<workflow_commands::CompleteWorkflowExecution> for command::Attributes {
1904 fn from(c: workflow_commands::CompleteWorkflowExecution) -> Self {
1905 Self::CompleteWorkflowExecutionCommandAttributes(
1906 CompleteWorkflowExecutionCommandAttributes {
1907 result: c.result.map(Into::into),
1908 },
1909 )
1910 }
1911 }
1912
1913 impl From<workflow_commands::FailWorkflowExecution> for command::Attributes {
1914 fn from(c: workflow_commands::FailWorkflowExecution) -> Self {
1915 Self::FailWorkflowExecutionCommandAttributes(
1916 FailWorkflowExecutionCommandAttributes {
1917 failure: c.failure.map(Into::into),
1918 },
1919 )
1920 }
1921 }
1922
1923 #[allow(deprecated)]
1924 pub fn continue_as_new_cmd_to_api(
1925 c: workflow_commands::ContinueAsNewWorkflowExecution,
1926 inherit_build_id: bool,
1927 ) -> command::Attributes {
1928 command::Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(
1929 ContinueAsNewWorkflowExecutionCommandAttributes {
1930 workflow_type: Some(c.workflow_type.into()),
1931 task_queue: Some(c.task_queue.into()),
1932 input: c.arguments.into_payloads(),
1933 workflow_run_timeout: c.workflow_run_timeout,
1934 workflow_task_timeout: c.workflow_task_timeout,
1935 memo: if c.memo.is_empty() {
1936 None
1937 } else {
1938 Some(c.memo.into())
1939 },
1940 header: if c.headers.is_empty() {
1941 None
1942 } else {
1943 Some(c.headers.into())
1944 },
1945 retry_policy: c.retry_policy,
1946 search_attributes: c.search_attributes,
1947 inherit_build_id,
1948 initial_versioning_behavior: c.initial_versioning_behavior,
1949 ..Default::default()
1950 },
1951 )
1952 }
1953
1954 impl From<workflow_commands::CancelWorkflowExecution> for command::Attributes {
1955 fn from(_c: workflow_commands::CancelWorkflowExecution) -> Self {
1956 Self::CancelWorkflowExecutionCommandAttributes(
1957 CancelWorkflowExecutionCommandAttributes { details: None },
1958 )
1959 }
1960 }
1961
1962 impl From<workflow_commands::ScheduleNexusOperation> for command::Attributes {
1963 fn from(c: workflow_commands::ScheduleNexusOperation) -> Self {
1964 Self::ScheduleNexusOperationCommandAttributes(
1965 ScheduleNexusOperationCommandAttributes {
1966 endpoint: c.endpoint,
1967 service: c.service,
1968 operation: c.operation,
1969 input: c.input,
1970 schedule_to_close_timeout: c.schedule_to_close_timeout,
1971 schedule_to_start_timeout: c.schedule_to_start_timeout,
1972 start_to_close_timeout: c.start_to_close_timeout,
1973 nexus_header: c.nexus_header,
1974 },
1975 )
1976 }
1977 }
1978 }
1979 }
1980 #[allow(rustdoc::invalid_html_tags)]
1981 pub mod cloud {
1982 pub mod account {
1983 pub mod v1 {
1984 tonic::include_proto!("temporal.api.cloud.account.v1");
1985 }
1986 }
1987 pub mod cloudservice {
1988 pub mod v1 {
1989 tonic::include_proto!("temporal.api.cloud.cloudservice.v1");
1990 }
1991 }
1992 pub mod connectivityrule {
1993 pub mod v1 {
1994 tonic::include_proto!("temporal.api.cloud.connectivityrule.v1");
1995 }
1996 }
1997 pub mod identity {
1998 pub mod v1 {
1999 tonic::include_proto!("temporal.api.cloud.identity.v1");
2000 }
2001 }
2002 pub mod namespace {
2003 pub mod v1 {
2004 tonic::include_proto!("temporal.api.cloud.namespace.v1");
2005 }
2006 }
2007 pub mod nexus {
2008 pub mod v1 {
2009 tonic::include_proto!("temporal.api.cloud.nexus.v1");
2010 }
2011 }
2012 pub mod operation {
2013 pub mod v1 {
2014 tonic::include_proto!("temporal.api.cloud.operation.v1");
2015 }
2016 }
2017 pub mod region {
2018 pub mod v1 {
2019 tonic::include_proto!("temporal.api.cloud.region.v1");
2020 }
2021 }
2022 pub mod resource {
2023 pub mod v1 {
2024 tonic::include_proto!("temporal.api.cloud.resource.v1");
2025 }
2026 }
2027 pub mod sink {
2028 pub mod v1 {
2029 tonic::include_proto!("temporal.api.cloud.sink.v1");
2030 }
2031 }
2032 pub mod usage {
2033 pub mod v1 {
2034 tonic::include_proto!("temporal.api.cloud.usage.v1");
2035 }
2036 }
2037 }
2038 pub mod common {
2039 pub mod v1 {
2040 use crate::protos::{ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL};
2041 use base64::{Engine, prelude::BASE64_STANDARD};
2042 use std::{
2043 collections::HashMap,
2044 fmt::{Display, Formatter},
2045 };
2046 include_proto_with_serde!("temporal.api.common.v1");
2047
2048 impl<T> From<T> for Payload
2049 where
2050 T: AsRef<[u8]>,
2051 {
2052 fn from(v: T) -> Self {
2053 let mut metadata = HashMap::new();
2056 metadata.insert(ENCODING_PAYLOAD_KEY.to_string(), b"binary/plain".to_vec());
2057 Self {
2058 metadata,
2059 data: v.as_ref().to_vec(),
2060 external_payloads: Default::default(),
2061 }
2062 }
2063 }
2064
2065 impl Payload {
2066 pub fn as_slice(&self) -> &[u8] {
2068 self.data.as_slice()
2069 }
2070
2071 pub fn is_json_payload(&self) -> bool {
2072 self.metadata
2073 .get(ENCODING_PAYLOAD_KEY)
2074 .map(|v| v.as_slice() == JSON_ENCODING_VAL.as_bytes())
2075 .unwrap_or_default()
2076 }
2077 }
2078
2079 impl std::fmt::Debug for Payload {
2080 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2081 if std::env::var("TEMPORAL_PRINT_FULL_PAYLOADS").is_err()
2082 && self.data.len() > 64
2083 {
2084 let mut windows = self.data.as_slice().windows(32);
2085 write!(
2086 f,
2087 "[{}..{}]",
2088 BASE64_STANDARD.encode(windows.next().unwrap_or_default()),
2089 BASE64_STANDARD.encode(windows.next_back().unwrap_or_default())
2090 )
2091 } else {
2092 write!(f, "[{}]", BASE64_STANDARD.encode(&self.data))
2093 }
2094 }
2095 }
2096
2097 impl Display for Payload {
2098 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2099 write!(f, "{:?}", self)
2100 }
2101 }
2102
2103 impl Display for Header {
2104 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2105 write!(f, "Header(")?;
2106 for kv in &self.fields {
2107 write!(f, "{}: ", kv.0)?;
2108 write!(f, "{}, ", kv.1)?;
2109 }
2110 write!(f, ")")
2111 }
2112 }
2113
2114 impl From<Header> for HashMap<String, Payload> {
2115 fn from(h: Header) -> Self {
2116 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2117 }
2118 }
2119
2120 impl From<Memo> for HashMap<String, Payload> {
2121 fn from(h: Memo) -> Self {
2122 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2123 }
2124 }
2125
2126 impl From<SearchAttributes> for HashMap<String, Payload> {
2127 fn from(h: SearchAttributes) -> Self {
2128 h.indexed_fields
2129 .into_iter()
2130 .map(|(k, v)| (k, v.into()))
2131 .collect()
2132 }
2133 }
2134
2135 impl From<HashMap<String, Payload>> for SearchAttributes {
2136 fn from(h: HashMap<String, Payload>) -> Self {
2137 Self {
2138 indexed_fields: h.into_iter().map(|(k, v)| (k, v.into())).collect(),
2139 }
2140 }
2141 }
2142
2143 impl From<String> for ActivityType {
2144 fn from(name: String) -> Self {
2145 Self { name }
2146 }
2147 }
2148
2149 impl From<&str> for ActivityType {
2150 fn from(name: &str) -> Self {
2151 Self {
2152 name: name.to_string(),
2153 }
2154 }
2155 }
2156
2157 impl From<ActivityType> for String {
2158 fn from(at: ActivityType) -> Self {
2159 at.name
2160 }
2161 }
2162
2163 impl From<&str> for WorkflowType {
2164 fn from(v: &str) -> Self {
2165 Self {
2166 name: v.to_string(),
2167 }
2168 }
2169 }
2170 }
2171 }
2172 pub mod deployment {
2173 pub mod v1 {
2174 tonic::include_proto!("temporal.api.deployment.v1");
2175 }
2176 }
2177 pub mod enums {
2178 pub mod v1 {
2179 include_proto_with_serde!("temporal.api.enums.v1");
2180 }
2181 }
2182 pub mod errordetails {
2183 pub mod v1 {
2184 tonic::include_proto!("temporal.api.errordetails.v1");
2185 }
2186 }
2187 pub mod failure {
2188 pub mod v1 {
2189 include_proto_with_serde!("temporal.api.failure.v1");
2190 }
2191 }
2192 pub mod filter {
2193 pub mod v1 {
2194 tonic::include_proto!("temporal.api.filter.v1");
2195 }
2196 }
2197 pub mod history {
2198 pub mod v1 {
2199 use crate::protos::temporal::api::{
2200 enums::v1::EventType, history::v1::history_event::Attributes,
2201 };
2202 use anyhow::bail;
2203 use std::fmt::{Display, Formatter};
2204
2205 tonic::include_proto!("temporal.api.history.v1");
2206
2207 impl History {
2208 pub fn extract_run_id_from_start(&self) -> Result<&str, anyhow::Error> {
2209 extract_original_run_id_from_events(&self.events)
2210 }
2211
2212 pub fn last_event_id(&self) -> i64 {
2215 self.events.last().map(|e| e.event_id).unwrap_or_default()
2216 }
2217 }
2218
2219 pub fn extract_original_run_id_from_events(
2220 events: &[HistoryEvent],
2221 ) -> Result<&str, anyhow::Error> {
2222 if let Some(Attributes::WorkflowExecutionStartedEventAttributes(wes)) =
2223 events.get(0).and_then(|x| x.attributes.as_ref())
2224 {
2225 Ok(&wes.original_execution_run_id)
2226 } else {
2227 bail!("First event is not WorkflowExecutionStarted?!?")
2228 }
2229 }
2230
2231 impl HistoryEvent {
2232 pub fn is_command_event(&self) -> bool {
2234 EventType::try_from(self.event_type).map_or(false, |et| match et {
2235 EventType::ActivityTaskScheduled
2236 | EventType::ActivityTaskCancelRequested
2237 | EventType::MarkerRecorded
2238 | EventType::RequestCancelExternalWorkflowExecutionInitiated
2239 | EventType::SignalExternalWorkflowExecutionInitiated
2240 | EventType::StartChildWorkflowExecutionInitiated
2241 | EventType::TimerCanceled
2242 | EventType::TimerStarted
2243 | EventType::UpsertWorkflowSearchAttributes
2244 | EventType::WorkflowPropertiesModified
2245 | EventType::NexusOperationScheduled
2246 | EventType::NexusOperationCancelRequested
2247 | EventType::WorkflowExecutionCanceled
2248 | EventType::WorkflowExecutionCompleted
2249 | EventType::WorkflowExecutionContinuedAsNew
2250 | EventType::WorkflowExecutionFailed
2251 | EventType::WorkflowExecutionUpdateAccepted
2252 | EventType::WorkflowExecutionUpdateRejected
2253 | EventType::WorkflowExecutionUpdateCompleted => true,
2254 _ => false,
2255 })
2256 }
2257
2258 pub fn get_initial_command_event_id(&self) -> Option<i64> {
2262 self.attributes.as_ref().and_then(|a| {
2263 match a {
2266 Attributes::ActivityTaskStartedEventAttributes(a) =>
2267 Some(a.scheduled_event_id),
2268 Attributes::ActivityTaskCompletedEventAttributes(a) =>
2269 Some(a.scheduled_event_id),
2270 Attributes::ActivityTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2271 Attributes::ActivityTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2272 Attributes::ActivityTaskCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2273 Attributes::ActivityTaskCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2274 Attributes::TimerFiredEventAttributes(a) => Some(a.started_event_id),
2275 Attributes::TimerCanceledEventAttributes(a) => Some(a.started_event_id),
2276 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2277 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(a) => Some(a.initiated_event_id),
2278 Attributes::StartChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2279 Attributes::ChildWorkflowExecutionStartedEventAttributes(a) => Some(a.initiated_event_id),
2280 Attributes::ChildWorkflowExecutionCompletedEventAttributes(a) => Some(a.initiated_event_id),
2281 Attributes::ChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2282 Attributes::ChildWorkflowExecutionCanceledEventAttributes(a) => Some(a.initiated_event_id),
2283 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(a) => Some(a.initiated_event_id),
2284 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(a) => Some(a.initiated_event_id),
2285 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2286 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(a) => Some(a.initiated_event_id),
2287 Attributes::WorkflowTaskStartedEventAttributes(a) => Some(a.scheduled_event_id),
2288 Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2289 Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2290 Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2291 Attributes::NexusOperationStartedEventAttributes(a) => Some(a.scheduled_event_id),
2292 Attributes::NexusOperationCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2293 Attributes::NexusOperationFailedEventAttributes(a) => Some(a.scheduled_event_id),
2294 Attributes::NexusOperationTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2295 Attributes::NexusOperationCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2296 Attributes::NexusOperationCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2297 Attributes::NexusOperationCancelRequestCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2298 Attributes::NexusOperationCancelRequestFailedEventAttributes(a) => Some(a.scheduled_event_id),
2299 _ => None
2300 }
2301 })
2302 }
2303
2304 pub fn get_protocol_instance_id(&self) -> Option<&str> {
2306 self.attributes.as_ref().and_then(|a| match a {
2307 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(a) => {
2308 Some(a.protocol_instance_id.as_str())
2309 }
2310 _ => None,
2311 })
2312 }
2313
2314 pub fn is_final_wf_execution_event(&self) -> bool {
2316 match self.event_type() {
2317 EventType::WorkflowExecutionCompleted => true,
2318 EventType::WorkflowExecutionCanceled => true,
2319 EventType::WorkflowExecutionFailed => true,
2320 EventType::WorkflowExecutionTimedOut => true,
2321 EventType::WorkflowExecutionContinuedAsNew => true,
2322 EventType::WorkflowExecutionTerminated => true,
2323 _ => false,
2324 }
2325 }
2326
2327 pub fn is_wft_closed_event(&self) -> bool {
2328 match self.event_type() {
2329 EventType::WorkflowTaskCompleted => true,
2330 EventType::WorkflowTaskFailed => true,
2331 EventType::WorkflowTaskTimedOut => true,
2332 _ => false,
2333 }
2334 }
2335
2336 pub fn is_ignorable(&self) -> bool {
2337 if !self.worker_may_ignore {
2338 return false;
2339 }
2340 if let Some(a) = self.attributes.as_ref() {
2343 match a {
2344 Attributes::WorkflowExecutionStartedEventAttributes(_) => false,
2345 Attributes::WorkflowExecutionCompletedEventAttributes(_) => false,
2346 Attributes::WorkflowExecutionFailedEventAttributes(_) => false,
2347 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => false,
2348 Attributes::WorkflowTaskScheduledEventAttributes(_) => false,
2349 Attributes::WorkflowTaskStartedEventAttributes(_) => false,
2350 Attributes::WorkflowTaskCompletedEventAttributes(_) => false,
2351 Attributes::WorkflowTaskTimedOutEventAttributes(_) => false,
2352 Attributes::WorkflowTaskFailedEventAttributes(_) => false,
2353 Attributes::ActivityTaskScheduledEventAttributes(_) => false,
2354 Attributes::ActivityTaskStartedEventAttributes(_) => false,
2355 Attributes::ActivityTaskCompletedEventAttributes(_) => false,
2356 Attributes::ActivityTaskFailedEventAttributes(_) => false,
2357 Attributes::ActivityTaskTimedOutEventAttributes(_) => false,
2358 Attributes::TimerStartedEventAttributes(_) => false,
2359 Attributes::TimerFiredEventAttributes(_) => false,
2360 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => false,
2361 Attributes::ActivityTaskCanceledEventAttributes(_) => false,
2362 Attributes::TimerCanceledEventAttributes(_) => false,
2363 Attributes::MarkerRecordedEventAttributes(_) => false,
2364 Attributes::WorkflowExecutionSignaledEventAttributes(_) => false,
2365 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => false,
2366 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => false,
2367 Attributes::WorkflowExecutionCanceledEventAttributes(_) => false,
2368 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2369 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => false,
2370 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => false,
2371 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => false,
2372 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => false,
2373 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => false,
2374 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => false,
2375 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => false,
2376 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => false,
2377 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => false,
2378 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => false,
2379 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => false,
2380 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2381 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => false,
2382 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => false,
2383 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => false,
2384 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => false,
2385 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => false,
2386 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => false,
2387 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => false,
2388 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => false,
2389 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => false,
2390 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => false,
2391 Attributes::NexusOperationScheduledEventAttributes(_) => false,
2392 Attributes::NexusOperationStartedEventAttributes(_) => false,
2393 Attributes::NexusOperationCompletedEventAttributes(_) => false,
2394 Attributes::NexusOperationFailedEventAttributes(_) => false,
2395 Attributes::NexusOperationCanceledEventAttributes(_) => false,
2396 Attributes::NexusOperationTimedOutEventAttributes(_) => false,
2397 Attributes::NexusOperationCancelRequestedEventAttributes(_) => false,
2398 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => true,
2400 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => false,
2401 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => false,
2402 Attributes::WorkflowExecutionPausedEventAttributes(_) => true,
2404 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => true,
2406 }
2407 } else {
2408 false
2409 }
2410 }
2411 }
2412
2413 impl Display for HistoryEvent {
2414 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2415 write!(
2416 f,
2417 "HistoryEvent(id: {}, {:?})",
2418 self.event_id,
2419 EventType::try_from(self.event_type).unwrap_or_default()
2420 )
2421 }
2422 }
2423
2424 impl Attributes {
2425 pub fn event_type(&self) -> EventType {
2426 match self {
2428 Attributes::WorkflowExecutionStartedEventAttributes(_) => { EventType::WorkflowExecutionStarted }
2429 Attributes::WorkflowExecutionCompletedEventAttributes(_) => { EventType::WorkflowExecutionCompleted }
2430 Attributes::WorkflowExecutionFailedEventAttributes(_) => { EventType::WorkflowExecutionFailed }
2431 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => { EventType::WorkflowExecutionTimedOut }
2432 Attributes::WorkflowTaskScheduledEventAttributes(_) => { EventType::WorkflowTaskScheduled }
2433 Attributes::WorkflowTaskStartedEventAttributes(_) => { EventType::WorkflowTaskStarted }
2434 Attributes::WorkflowTaskCompletedEventAttributes(_) => { EventType::WorkflowTaskCompleted }
2435 Attributes::WorkflowTaskTimedOutEventAttributes(_) => { EventType::WorkflowTaskTimedOut }
2436 Attributes::WorkflowTaskFailedEventAttributes(_) => { EventType::WorkflowTaskFailed }
2437 Attributes::ActivityTaskScheduledEventAttributes(_) => { EventType::ActivityTaskScheduled }
2438 Attributes::ActivityTaskStartedEventAttributes(_) => { EventType::ActivityTaskStarted }
2439 Attributes::ActivityTaskCompletedEventAttributes(_) => { EventType::ActivityTaskCompleted }
2440 Attributes::ActivityTaskFailedEventAttributes(_) => { EventType::ActivityTaskFailed }
2441 Attributes::ActivityTaskTimedOutEventAttributes(_) => { EventType::ActivityTaskTimedOut }
2442 Attributes::TimerStartedEventAttributes(_) => { EventType::TimerStarted }
2443 Attributes::TimerFiredEventAttributes(_) => { EventType::TimerFired }
2444 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => { EventType::ActivityTaskCancelRequested }
2445 Attributes::ActivityTaskCanceledEventAttributes(_) => { EventType::ActivityTaskCanceled }
2446 Attributes::TimerCanceledEventAttributes(_) => { EventType::TimerCanceled }
2447 Attributes::MarkerRecordedEventAttributes(_) => { EventType::MarkerRecorded }
2448 Attributes::WorkflowExecutionSignaledEventAttributes(_) => { EventType::WorkflowExecutionSignaled }
2449 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => { EventType::WorkflowExecutionTerminated }
2450 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::WorkflowExecutionCancelRequested }
2451 Attributes::WorkflowExecutionCanceledEventAttributes(_) => { EventType::WorkflowExecutionCanceled }
2452 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionInitiated }
2453 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionFailed }
2454 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::ExternalWorkflowExecutionCancelRequested }
2455 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => { EventType::WorkflowExecutionContinuedAsNew }
2456 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => { EventType::StartChildWorkflowExecutionInitiated }
2457 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => { EventType::StartChildWorkflowExecutionFailed }
2458 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => { EventType::ChildWorkflowExecutionStarted }
2459 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => { EventType::ChildWorkflowExecutionCompleted }
2460 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => { EventType::ChildWorkflowExecutionFailed }
2461 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => { EventType::ChildWorkflowExecutionCanceled }
2462 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => { EventType::ChildWorkflowExecutionTimedOut }
2463 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => { EventType::ChildWorkflowExecutionTerminated }
2464 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionInitiated }
2465 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionFailed }
2466 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => { EventType::ExternalWorkflowExecutionSignaled }
2467 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => { EventType::UpsertWorkflowSearchAttributes }
2468 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAdmitted }
2469 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => { EventType::WorkflowExecutionUpdateRejected }
2470 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAccepted }
2471 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => { EventType::WorkflowExecutionUpdateCompleted }
2472 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => { EventType::WorkflowPropertiesModifiedExternally }
2473 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => { EventType::ActivityPropertiesModifiedExternally }
2474 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => { EventType::WorkflowPropertiesModified }
2475 Attributes::NexusOperationScheduledEventAttributes(_) => { EventType::NexusOperationScheduled }
2476 Attributes::NexusOperationStartedEventAttributes(_) => { EventType::NexusOperationStarted }
2477 Attributes::NexusOperationCompletedEventAttributes(_) => { EventType::NexusOperationCompleted }
2478 Attributes::NexusOperationFailedEventAttributes(_) => { EventType::NexusOperationFailed }
2479 Attributes::NexusOperationCanceledEventAttributes(_) => { EventType::NexusOperationCanceled }
2480 Attributes::NexusOperationTimedOutEventAttributes(_) => { EventType::NexusOperationTimedOut }
2481 Attributes::NexusOperationCancelRequestedEventAttributes(_) => { EventType::NexusOperationCancelRequested }
2482 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => { EventType::WorkflowExecutionOptionsUpdated }
2483 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => { EventType::NexusOperationCancelRequestCompleted }
2484 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => { EventType::NexusOperationCancelRequestFailed }
2485 Attributes::WorkflowExecutionPausedEventAttributes(_) => { EventType::WorkflowExecutionPaused }
2486 Attributes::WorkflowExecutionUnpausedEventAttributes(_) => { EventType::WorkflowExecutionUnpaused }
2487 }
2488 }
2489 }
2490 }
2491 }
2492 pub mod namespace {
2493 pub mod v1 {
2494 tonic::include_proto!("temporal.api.namespace.v1");
2495 }
2496 }
2497 pub mod operatorservice {
2498 pub mod v1 {
2499 tonic::include_proto!("temporal.api.operatorservice.v1");
2500 }
2501 }
2502 pub mod protocol {
2503 pub mod v1 {
2504 use std::fmt::{Display, Formatter};
2505 tonic::include_proto!("temporal.api.protocol.v1");
2506
2507 impl Display for Message {
2508 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2509 write!(f, "ProtocolMessage({})", self.id)
2510 }
2511 }
2512 }
2513 }
2514 pub mod query {
2515 pub mod v1 {
2516 tonic::include_proto!("temporal.api.query.v1");
2517 }
2518 }
2519 pub mod replication {
2520 pub mod v1 {
2521 tonic::include_proto!("temporal.api.replication.v1");
2522 }
2523 }
2524 pub mod rules {
2525 pub mod v1 {
2526 tonic::include_proto!("temporal.api.rules.v1");
2527 }
2528 }
2529 pub mod schedule {
2530 #[allow(rustdoc::invalid_html_tags)]
2531 pub mod v1 {
2532 tonic::include_proto!("temporal.api.schedule.v1");
2533 }
2534 }
2535 pub mod sdk {
2536 pub mod v1 {
2537 tonic::include_proto!("temporal.api.sdk.v1");
2538 }
2539 }
2540 pub mod taskqueue {
2541 pub mod v1 {
2542 use crate::protos::temporal::api::enums::v1::TaskQueueKind;
2543 tonic::include_proto!("temporal.api.taskqueue.v1");
2544
2545 impl From<String> for TaskQueue {
2546 fn from(name: String) -> Self {
2547 Self {
2548 name,
2549 kind: TaskQueueKind::Normal as i32,
2550 normal_name: "".to_string(),
2551 }
2552 }
2553 }
2554 }
2555 }
2556 pub mod testservice {
2557 pub mod v1 {
2558 tonic::include_proto!("temporal.api.testservice.v1");
2559 }
2560 }
2561 pub mod update {
2562 pub mod v1 {
2563 use crate::protos::temporal::api::update::v1::outcome::Value;
2564 tonic::include_proto!("temporal.api.update.v1");
2565
2566 impl Outcome {
2567 pub fn is_success(&self) -> bool {
2568 match self.value {
2569 Some(Value::Success(_)) => true,
2570 _ => false,
2571 }
2572 }
2573 }
2574 }
2575 }
2576 pub mod version {
2577 pub mod v1 {
2578 tonic::include_proto!("temporal.api.version.v1");
2579 }
2580 }
2581 pub mod worker {
2582 pub mod v1 {
2583 tonic::include_proto!("temporal.api.worker.v1");
2584 }
2585 }
2586 pub mod workflow {
2587 pub mod v1 {
2588 tonic::include_proto!("temporal.api.workflow.v1");
2589 }
2590 }
2591 pub mod nexus {
2592 pub mod v1 {
2593 use crate::protos::{
2594 camel_case_to_screaming_snake,
2595 temporal::api::{
2596 common::{
2597 self,
2598 v1::link::{WorkflowEvent, workflow_event},
2599 },
2600 enums::v1::EventType,
2601 failure,
2602 },
2603 };
2604 use anyhow::{anyhow, bail};
2605 use prost::Name;
2606 use std::{
2607 collections::HashMap,
2608 fmt::{Display, Formatter},
2609 };
2610 use tonic::transport::Uri;
2611
2612 tonic::include_proto!("temporal.api.nexus.v1");
2613
2614 impl Display for Response {
2615 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2616 write!(f, "NexusResponse(",)?;
2617 match &self.variant {
2618 None => {}
2619 Some(v) => {
2620 write!(f, "{v}")?;
2621 }
2622 }
2623 write!(f, ")")
2624 }
2625 }
2626
2627 impl Display for response::Variant {
2628 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2629 match self {
2630 response::Variant::StartOperation(_) => {
2631 write!(f, "StartOperation")
2632 }
2633 response::Variant::CancelOperation(_) => {
2634 write!(f, "CancelOperation")
2635 }
2636 }
2637 }
2638 }
2639
2640 impl Display for HandlerError {
2641 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2642 write!(f, "HandlerError")
2643 }
2644 }
2645
2646 pub enum NexusTaskFailure {
2647 Legacy(HandlerError),
2648 Temporal(failure::v1::Failure),
2649 }
2650
2651 static SCHEME_PREFIX: &str = "temporal://";
2652
2653 pub fn workflow_event_link_from_nexus(
2655 l: &Link,
2656 ) -> Result<common::v1::Link, anyhow::Error> {
2657 if !l.url.starts_with(SCHEME_PREFIX) {
2658 bail!("Invalid scheme for nexus link: {:?}", l.url);
2659 }
2660 let no_authority_url = l.url.strip_prefix(SCHEME_PREFIX).unwrap();
2663 let uri = Uri::try_from(no_authority_url)?;
2664 let parts = uri.into_parts();
2665 let path = parts.path_and_query.ok_or_else(|| {
2666 anyhow!("Failed to parse nexus link, invalid path: {:?}", l)
2667 })?;
2668 let path_parts = path.path().split('/').collect::<Vec<_>>();
2669 if path_parts.get(1) != Some(&"namespaces") {
2670 bail!("Invalid path for nexus link: {:?}", l);
2671 }
2672 let namespace = path_parts.get(2).ok_or_else(|| {
2673 anyhow!("Failed to parse nexus link, no namespace: {:?}", l)
2674 })?;
2675 if path_parts.get(3) != Some(&"workflows") {
2676 bail!("Invalid path for nexus link, no workflows segment: {:?}", l);
2677 }
2678 let workflow_id = path_parts.get(4).ok_or_else(|| {
2679 anyhow!("Failed to parse nexus link, no workflow id: {:?}", l)
2680 })?;
2681 let run_id = path_parts
2682 .get(5)
2683 .ok_or_else(|| anyhow!("Failed to parse nexus link, no run id: {:?}", l))?;
2684 if path_parts.get(6) != Some(&"history") {
2685 bail!("Invalid path for nexus link, no history segment: {:?}", l);
2686 }
2687 let reference = if let Some(query) = path.query() {
2688 let mut eventref = workflow_event::EventReference::default();
2689 let query_parts = query.split('&').collect::<Vec<_>>();
2690 for qp in query_parts {
2691 let mut kv = qp.split('=');
2692 let key = kv.next().ok_or_else(|| {
2693 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2694 })?;
2695 let val = kv.next().ok_or_else(|| {
2696 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2697 })?;
2698 match key {
2699 "eventID" => {
2700 eventref.event_id = val.parse().map_err(|_| {
2701 anyhow!("Failed to parse nexus link event id: {:?}", l)
2702 })?;
2703 }
2704 "eventType" => {
2705 eventref.event_type = EventType::from_str_name(val)
2706 .unwrap_or_else(|| {
2707 EventType::from_str_name(
2708 &("EVENT_TYPE_".to_string()
2709 + &camel_case_to_screaming_snake(val)),
2710 )
2711 .unwrap_or_default()
2712 })
2713 .into()
2714 }
2715 _ => continue,
2716 }
2717 }
2718 Some(workflow_event::Reference::EventRef(eventref))
2719 } else {
2720 None
2721 };
2722
2723 Ok(common::v1::Link {
2724 variant: Some(common::v1::link::Variant::WorkflowEvent(WorkflowEvent {
2725 namespace: namespace.to_string(),
2726 workflow_id: workflow_id.to_string(),
2727 run_id: run_id.to_string(),
2728 reference,
2729 })),
2730 })
2731 }
2732
2733 impl TryFrom<failure::v1::Failure> for Failure {
2734 type Error = serde_json::Error;
2735
2736 fn try_from(mut f: failure::v1::Failure) -> Result<Self, Self::Error> {
2737 let message = std::mem::take(&mut f.message);
2739
2740 let details = serde_json::to_vec(&f)?;
2742
2743 Ok(Failure {
2745 message,
2746 stack_trace: f.stack_trace,
2747 metadata: HashMap::from([(
2748 "type".to_string(),
2749 failure::v1::Failure::full_name().into(),
2750 )]),
2751 details,
2752 cause: None,
2753 })
2754 }
2755 }
2756 }
2757 }
2758 pub mod workflowservice {
2759 pub mod v1 {
2760 use std::{
2761 convert::TryInto,
2762 fmt::{Display, Formatter},
2763 time::{Duration, SystemTime},
2764 };
2765
2766 tonic::include_proto!("temporal.api.workflowservice.v1");
2767
2768 macro_rules! sched_to_start_impl {
2769 ($sched_field:ident) => {
2770 pub fn sched_to_start(&self) -> Option<Duration> {
2773 if let Some((sch, st)) =
2774 self.$sched_field.clone().zip(self.started_time.clone())
2775 {
2776 if let Some(value) = elapsed_between_prost_times(sch, st) {
2777 return value;
2778 }
2779 }
2780 None
2781 }
2782 };
2783 }
2784
2785 fn elapsed_between_prost_times(
2786 from: prost_types::Timestamp,
2787 to: prost_types::Timestamp,
2788 ) -> Option<Option<Duration>> {
2789 let from: Result<SystemTime, _> = from.try_into();
2790 let to: Result<SystemTime, _> = to.try_into();
2791 if let (Ok(from), Ok(to)) = (from, to) {
2792 return Some(to.duration_since(from).ok());
2793 }
2794 None
2795 }
2796
2797 impl PollWorkflowTaskQueueResponse {
2798 sched_to_start_impl!(scheduled_time);
2799 }
2800
2801 impl Display for PollWorkflowTaskQueueResponse {
2802 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2803 let last_event = self
2804 .history
2805 .as_ref()
2806 .and_then(|h| h.events.last().map(|he| he.event_id))
2807 .unwrap_or(0);
2808 write!(
2809 f,
2810 "PollWFTQResp(run_id: {}, attempt: {}, last_event: {})",
2811 self.workflow_execution
2812 .as_ref()
2813 .map_or("", |we| we.run_id.as_str()),
2814 self.attempt,
2815 last_event
2816 )
2817 }
2818 }
2819
2820 pub struct CompactHist<'a>(pub &'a PollWorkflowTaskQueueResponse);
2822 impl Display for CompactHist<'_> {
2823 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2824 writeln!(
2825 f,
2826 "PollWorkflowTaskQueueResponse (prev_started: {}, started: {})",
2827 self.0.previous_started_event_id, self.0.started_event_id
2828 )?;
2829 if let Some(h) = self.0.history.as_ref() {
2830 for event in &h.events {
2831 writeln!(f, "{}", event)?;
2832 }
2833 }
2834 writeln!(f, "query: {:#?}", self.0.query)?;
2835 writeln!(f, "queries: {:#?}", self.0.queries)
2836 }
2837 }
2838
2839 impl PollActivityTaskQueueResponse {
2840 sched_to_start_impl!(current_attempt_scheduled_time);
2841 }
2842
2843 impl PollNexusTaskQueueResponse {
2844 pub fn sched_to_start(&self) -> Option<Duration> {
2845 if let Some((sch, st)) = self
2846 .request
2847 .as_ref()
2848 .and_then(|r| r.scheduled_time)
2849 .clone()
2850 .zip(SystemTime::now().try_into().ok())
2851 {
2852 if let Some(value) = elapsed_between_prost_times(sch, st) {
2853 return value;
2854 }
2855 }
2856 None
2857 }
2858 }
2859
2860 impl QueryWorkflowResponse {
2861 pub fn unwrap(self) -> Vec<crate::protos::temporal::api::common::v1::Payload> {
2863 self.query_result.unwrap().payloads
2864 }
2865 }
2866 }
2867 }
2868 }
2869}
2870
2871#[allow(
2872 clippy::all,
2873 missing_docs,
2874 rustdoc::broken_intra_doc_links,
2875 rustdoc::bare_urls
2876)]
2877pub mod google {
2878 pub mod rpc {
2879 tonic::include_proto!("google.rpc");
2880 }
2881}
2882
2883#[allow(
2884 clippy::all,
2885 missing_docs,
2886 rustdoc::broken_intra_doc_links,
2887 rustdoc::bare_urls
2888)]
2889pub mod grpc {
2890 pub mod health {
2891 pub mod v1 {
2892 tonic::include_proto!("grpc.health.v1");
2893 }
2894 }
2895}
2896
2897pub fn camel_case_to_screaming_snake(val: &str) -> String {
2899 let mut out = String::new();
2900 let mut last_was_upper = true;
2901 for c in val.chars() {
2902 if c.is_uppercase() {
2903 if !last_was_upper {
2904 out.push('_');
2905 }
2906 out.push(c.to_ascii_uppercase());
2907 last_was_upper = true;
2908 } else {
2909 out.push(c.to_ascii_uppercase());
2910 last_was_upper = false;
2911 }
2912 }
2913 out
2914}
2915
2916pub fn proto_ts_to_system_time(ts: &prost_types::Timestamp) -> Option<std::time::SystemTime> {
2918 std::time::SystemTime::UNIX_EPOCH
2919 .checked_add(Duration::from_secs(ts.seconds as u64) + Duration::from_nanos(ts.nanos as u64))
2920}
2921
2922#[cfg(test)]
2923mod tests {
2924 use crate::protos::temporal::api::failure::v1::Failure;
2925 use anyhow::anyhow;
2926
2927 #[test]
2928 fn anyhow_to_failure_conversion() {
2929 let no_causes: Failure = anyhow!("no causes").into();
2930 assert_eq!(no_causes.cause, None);
2931 assert_eq!(no_causes.message, "no causes");
2932 let orig = anyhow!("fail 1");
2933 let mid = orig.context("fail 2");
2934 let top = mid.context("fail 3");
2935 let as_fail: Failure = top.into();
2936 assert_eq!(as_fail.message, "fail 3");
2937 assert_eq!(as_fail.cause.as_ref().unwrap().message, "fail 2");
2938 assert_eq!(as_fail.cause.unwrap().cause.unwrap().message, "fail 1");
2939 }
2940}