1pub mod constants;
6pub mod utilities;
7
8#[cfg(feature = "test-utilities")]
9pub mod canned_histories;
10#[cfg(feature = "history_builders")]
11mod history_builder;
12#[cfg(feature = "history_builders")]
13mod history_info;
14mod task_token;
15#[cfg(feature = "test-utilities")]
16pub mod test_utils;
17
18#[cfg(feature = "history_builders")]
19pub use history_builder::{
20 DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE, TestHistoryBuilder, default_act_sched,
21 default_wes_attribs,
22};
23#[cfg(feature = "history_builders")]
24pub use history_info::HistoryInfo;
25pub use task_token::TaskToken;
26
27pub static ENCODING_PAYLOAD_KEY: &str = "encoding";
28pub static JSON_ENCODING_VAL: &str = "json/plain";
29pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data";
30pub static VERSION_SEARCH_ATTR_KEY: &str = "TemporalChangeVersion";
32
33#[allow(
34 clippy::large_enum_variant,
35 clippy::derive_partial_eq_without_eq,
36 clippy::reserve_after_initialization
37)]
38#[allow(missing_docs)]
40pub mod coresdk {
41 tonic::include_proto!("coresdk");
44
45 use crate::{
46 ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL,
47 temporal::api::{
48 common::v1::{Payload, Payloads, RetryPolicy, WorkflowExecution},
49 enums::v1::{
50 ApplicationErrorCategory, TimeoutType, VersioningBehavior, WorkflowTaskFailedCause,
51 },
52 failure::v1::{
53 ActivityFailureInfo, ApplicationFailureInfo, Failure, TimeoutFailureInfo,
54 failure::FailureInfo,
55 },
56 workflowservice::v1::PollActivityTaskQueueResponse,
57 },
58 };
59 use activity_task::ActivityTask;
60 use serde::{Deserialize, Serialize};
61 use std::{
62 collections::HashMap,
63 convert::TryFrom,
64 fmt::{Display, Formatter},
65 iter::FromIterator,
66 };
67 use workflow_activation::{WorkflowActivationJob, workflow_activation_job};
68 use workflow_commands::{WorkflowCommand, workflow_command, workflow_command::Variant};
69 use workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion};
70
71 #[allow(clippy::module_inception)]
72 pub mod activity_task {
73 use crate::{coresdk::ActivityTaskCompletion, task_token::fmt_tt};
74 use std::fmt::{Display, Formatter};
75 tonic::include_proto!("coresdk.activity_task");
76
77 impl ActivityTask {
78 pub fn cancel_from_ids(
79 task_token: Vec<u8>,
80 reason: ActivityCancelReason,
81 details: ActivityCancellationDetails,
82 ) -> Self {
83 Self {
84 task_token,
85 variant: Some(activity_task::Variant::Cancel(Cancel {
86 reason: reason as i32,
87 details: Some(details),
88 })),
89 }
90 }
91
92 pub fn is_timeout(&self) -> bool {
94 match &self.variant {
95 Some(activity_task::Variant::Cancel(Cancel { reason, details })) => {
96 *reason == ActivityCancelReason::TimedOut as i32
97 || details.as_ref().is_some_and(|d| d.is_timed_out)
98 }
99 _ => false,
100 }
101 }
102
103 pub fn primary_reason_to_cancellation_details(
104 reason: ActivityCancelReason,
105 ) -> ActivityCancellationDetails {
106 ActivityCancellationDetails {
107 is_not_found: reason == ActivityCancelReason::NotFound,
108 is_cancelled: reason == ActivityCancelReason::Cancelled,
109 is_paused: reason == ActivityCancelReason::Paused,
110 is_timed_out: reason == ActivityCancelReason::TimedOut,
111 is_worker_shutdown: reason == ActivityCancelReason::WorkerShutdown,
112 is_reset: reason == ActivityCancelReason::Reset,
113 }
114 }
115 }
116
117 impl Display for ActivityTaskCompletion {
118 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119 write!(
120 f,
121 "ActivityTaskCompletion(token: {}",
122 fmt_tt(&self.task_token),
123 )?;
124 if let Some(r) = self.result.as_ref().and_then(|r| r.status.as_ref()) {
125 write!(f, ", {r}")?;
126 } else {
127 write!(f, ", missing result")?;
128 }
129 write!(f, ")")
130 }
131 }
132 }
133 #[allow(clippy::module_inception)]
134 pub mod activity_result {
135 tonic::include_proto!("coresdk.activity_result");
136 use super::super::temporal::api::{
137 common::v1::Payload,
138 failure::v1::{CanceledFailureInfo, Failure as APIFailure, failure},
139 };
140 use crate::{
141 coresdk::activity_result::activity_resolution::Status,
142 temporal::api::enums::v1::TimeoutType,
143 };
144 use activity_execution_result as aer;
145 use anyhow::anyhow;
146 use std::fmt::{Display, Formatter};
147
148 impl ActivityExecutionResult {
149 pub const fn ok(result: Payload) -> Self {
150 Self {
151 status: Some(aer::Status::Completed(Success {
152 result: Some(result),
153 })),
154 }
155 }
156
157 pub fn fail(fail: APIFailure) -> Self {
158 Self {
159 status: Some(aer::Status::Failed(Failure {
160 failure: Some(fail),
161 })),
162 }
163 }
164
165 pub fn cancel_from_details(payload: Option<Payload>) -> Self {
166 Self {
167 status: Some(aer::Status::Cancelled(Cancellation::from_details(payload))),
168 }
169 }
170
171 pub const fn will_complete_async() -> Self {
172 Self {
173 status: Some(aer::Status::WillCompleteAsync(WillCompleteAsync {})),
174 }
175 }
176
177 pub fn is_cancelled(&self) -> bool {
178 matches!(self.status, Some(aer::Status::Cancelled(_)))
179 }
180 }
181
182 impl Display for aer::Status {
183 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
184 write!(f, "ActivityExecutionResult(")?;
185 match self {
186 aer::Status::Completed(v) => {
187 write!(f, "{v})")
188 }
189 aer::Status::Failed(v) => {
190 write!(f, "{v})")
191 }
192 aer::Status::Cancelled(v) => {
193 write!(f, "{v})")
194 }
195 aer::Status::WillCompleteAsync(_) => {
196 write!(f, "Will complete async)")
197 }
198 }
199 }
200 }
201
202 impl Display for Success {
203 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
204 write!(f, "Success(")?;
205 if let Some(ref v) = self.result {
206 write!(f, "{v}")?;
207 }
208 write!(f, ")")
209 }
210 }
211
212 impl Display for Failure {
213 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
214 write!(f, "Failure(")?;
215 if let Some(ref v) = self.failure {
216 write!(f, "{v}")?;
217 }
218 write!(f, ")")
219 }
220 }
221
222 impl Display for Cancellation {
223 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224 write!(f, "Cancellation(")?;
225 if let Some(ref v) = self.failure {
226 write!(f, "{v}")?;
227 }
228 write!(f, ")")
229 }
230 }
231
232 impl From<Result<Payload, APIFailure>> for ActivityExecutionResult {
233 fn from(r: Result<Payload, APIFailure>) -> Self {
234 Self {
235 status: match r {
236 Ok(p) => Some(aer::Status::Completed(Success { result: Some(p) })),
237 Err(f) => Some(aer::Status::Failed(Failure { failure: Some(f) })),
238 },
239 }
240 }
241 }
242
243 impl ActivityResolution {
244 pub fn success_payload_or_error(self) -> Result<Option<Payload>, anyhow::Error> {
247 let Some(status) = self.status else {
248 return Err(anyhow!("Activity completed without a status"));
249 };
250
251 match status {
252 activity_resolution::Status::Completed(success) => Ok(success.result),
253 e => Err(anyhow!("Activity was not successful: {e:?}")),
254 }
255 }
256
257 pub fn unwrap_ok_payload(self) -> Payload {
258 self.success_payload_or_error().unwrap().unwrap()
259 }
260
261 pub fn completed_ok(&self) -> bool {
262 matches!(self.status, Some(activity_resolution::Status::Completed(_)))
263 }
264
265 pub fn failed(&self) -> bool {
266 matches!(self.status, Some(activity_resolution::Status::Failed(_)))
267 }
268
269 pub fn timed_out(&self) -> Option<TimeoutType> {
270 match self.status {
271 Some(activity_resolution::Status::Failed(Failure {
272 failure: Some(ref f),
273 })) => f
274 .is_timeout()
275 .or_else(|| f.cause.as_ref().and_then(|c| c.is_timeout())),
276 _ => None,
277 }
278 }
279
280 pub fn cancelled(&self) -> bool {
281 matches!(self.status, Some(activity_resolution::Status::Cancelled(_)))
282 }
283
284 pub fn unwrap_failure(self) -> APIFailure {
287 match self.status.unwrap() {
288 Status::Failed(f) => f.failure.unwrap(),
289 Status::Cancelled(c) => c.failure.unwrap(),
290 _ => panic!("Actvity did not fail"),
291 }
292 }
293 }
294
295 impl Cancellation {
296 pub fn from_details(details: Option<Payload>) -> Self {
299 Cancellation {
300 failure: Some(APIFailure {
301 message: "Activity cancelled".to_string(),
302 failure_info: Some(failure::FailureInfo::CanceledFailureInfo(
303 CanceledFailureInfo {
304 details: details.map(Into::into),
305 },
306 )),
307 ..Default::default()
308 }),
309 }
310 }
311 }
312 }
313
314 pub mod common {
315 tonic::include_proto!("coresdk.common");
316 use super::external_data::LocalActivityMarkerData;
317 use crate::{
318 PATCHED_MARKER_DETAILS_KEY,
319 coresdk::{
320 AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt,
321 external_data::PatchedMarkerData,
322 },
323 temporal::api::common::v1::{Payload, Payloads},
324 };
325 use std::collections::HashMap;
326
327 pub fn build_has_change_marker_details(
328 patch_id: impl Into<String>,
329 deprecated: bool,
330 ) -> anyhow::Result<HashMap<String, Payloads>> {
331 let mut hm = HashMap::new();
332 let encoded = PatchedMarkerData {
333 id: patch_id.into(),
334 deprecated,
335 }
336 .as_json_payload()?;
337 hm.insert(PATCHED_MARKER_DETAILS_KEY.to_string(), encoded.into());
338 Ok(hm)
339 }
340
341 pub fn decode_change_marker_details(
342 details: &HashMap<String, Payloads>,
343 ) -> Option<(String, bool)> {
344 if let Some(cd) = details.get(PATCHED_MARKER_DETAILS_KEY) {
347 let decoded = PatchedMarkerData::from_json_payload(cd.payloads.first()?).ok()?;
348 return Some((decoded.id, decoded.deprecated));
349 }
350
351 let id_entry = details.get("patch_id")?.payloads.first()?;
352 let deprecated_entry = details.get("deprecated")?.payloads.first()?;
353 let name = std::str::from_utf8(&id_entry.data).ok()?;
354 let deprecated = *deprecated_entry.data.first()? != 0;
355 Some((name.to_string(), deprecated))
356 }
357
358 pub fn build_local_activity_marker_details(
359 metadata: LocalActivityMarkerData,
360 result: Option<Payload>,
361 ) -> HashMap<String, Payloads> {
362 let mut hm = HashMap::new();
363 if let Some(jsonified) = metadata.as_json_payload().into_payloads() {
366 hm.insert("data".to_string(), jsonified);
367 }
368 if let Some(res) = result {
369 hm.insert("result".to_string(), res.into());
370 }
371 hm
372 }
373
374 pub fn extract_local_activity_marker_data(
377 details: &HashMap<String, Payloads>,
378 ) -> Option<LocalActivityMarkerData> {
379 details
380 .get("data")
381 .and_then(|p| p.payloads.first())
382 .and_then(|p| std::str::from_utf8(&p.data).ok())
383 .and_then(|s| serde_json::from_str(s).ok())
384 }
385
386 pub fn extract_local_activity_marker_details(
390 details: &mut HashMap<String, Payloads>,
391 ) -> (Option<LocalActivityMarkerData>, Option<Payload>) {
392 let data = extract_local_activity_marker_data(details);
393 let result = details.remove("result").and_then(|mut p| p.payloads.pop());
394 (data, result)
395 }
396 }
397
398 pub mod external_data {
399 use prost_wkt_types::{Duration, Timestamp};
400 use serde::{Deserialize, Deserializer, Serialize, Serializer};
401 tonic::include_proto!("coresdk.external_data");
402
403 #[derive(Serialize, Deserialize)]
407 #[serde(remote = "Timestamp")]
408 struct TimestampDef {
409 seconds: i64,
410 nanos: i32,
411 }
412 mod opt_timestamp {
413 use super::*;
414
415 pub(super) fn serialize<S>(
416 value: &Option<Timestamp>,
417 serializer: S,
418 ) -> Result<S::Ok, S::Error>
419 where
420 S: Serializer,
421 {
422 #[derive(Serialize)]
423 struct Helper<'a>(#[serde(with = "TimestampDef")] &'a Timestamp);
424
425 value.as_ref().map(Helper).serialize(serializer)
426 }
427
428 pub(super) fn deserialize<'de, D>(
429 deserializer: D,
430 ) -> Result<Option<Timestamp>, D::Error>
431 where
432 D: Deserializer<'de>,
433 {
434 #[derive(Deserialize)]
435 struct Helper(#[serde(with = "TimestampDef")] Timestamp);
436
437 let helper = Option::deserialize(deserializer)?;
438 Ok(helper.map(|Helper(external)| external))
439 }
440 }
441
442 #[derive(Serialize, Deserialize)]
444 #[serde(remote = "Duration")]
445 struct DurationDef {
446 seconds: i64,
447 nanos: i32,
448 }
449 mod opt_duration {
450 use super::*;
451
452 pub(super) fn serialize<S>(
453 value: &Option<Duration>,
454 serializer: S,
455 ) -> Result<S::Ok, S::Error>
456 where
457 S: Serializer,
458 {
459 #[derive(Serialize)]
460 struct Helper<'a>(#[serde(with = "DurationDef")] &'a Duration);
461
462 value.as_ref().map(Helper).serialize(serializer)
463 }
464
465 pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
466 where
467 D: Deserializer<'de>,
468 {
469 #[derive(Deserialize)]
470 struct Helper(#[serde(with = "DurationDef")] Duration);
471
472 let helper = Option::deserialize(deserializer)?;
473 Ok(helper.map(|Helper(external)| external))
474 }
475 }
476 }
477
478 pub mod workflow_activation {
479 use crate::{
480 coresdk::{
481 FromPayloadsExt,
482 activity_result::{ActivityResolution, activity_resolution},
483 common::NamespacedWorkflowExecution,
484 fix_retry_policy,
485 workflow_activation::remove_from_cache::EvictionReason,
486 },
487 temporal::api::{
488 enums::v1::WorkflowTaskFailedCause,
489 history::v1::{
490 WorkflowExecutionCancelRequestedEventAttributes,
491 WorkflowExecutionSignaledEventAttributes,
492 WorkflowExecutionStartedEventAttributes,
493 },
494 query::v1::WorkflowQuery,
495 },
496 };
497 use prost_wkt_types::Timestamp;
498 use std::fmt::{Display, Formatter};
499
500 tonic::include_proto!("coresdk.workflow_activation");
501
502 pub fn create_evict_activation(
503 run_id: String,
504 message: String,
505 reason: EvictionReason,
506 ) -> WorkflowActivation {
507 WorkflowActivation {
508 timestamp: None,
509 run_id,
510 is_replaying: false,
511 history_length: 0,
512 jobs: vec![WorkflowActivationJob::from(
513 workflow_activation_job::Variant::RemoveFromCache(RemoveFromCache {
514 message,
515 reason: reason as i32,
516 }),
517 )],
518 available_internal_flags: vec![],
519 history_size_bytes: 0,
520 continue_as_new_suggested: false,
521 deployment_version_for_current_task: None,
522 }
523 }
524
525 pub fn query_to_job(id: String, q: WorkflowQuery) -> QueryWorkflow {
526 QueryWorkflow {
527 query_id: id,
528 query_type: q.query_type,
529 arguments: Vec::from_payloads(q.query_args),
530 headers: q.header.map(|h| h.into()).unwrap_or_default(),
531 }
532 }
533
534 impl WorkflowActivation {
535 pub fn is_only_eviction(&self) -> bool {
537 matches!(
538 self.jobs.as_slice(),
539 [WorkflowActivationJob {
540 variant: Some(workflow_activation_job::Variant::RemoveFromCache(_))
541 }]
542 )
543 }
544
545 pub fn eviction_reason(&self) -> Option<EvictionReason> {
547 self.jobs.iter().find_map(|j| {
548 if let Some(workflow_activation_job::Variant::RemoveFromCache(ref rj)) =
549 j.variant
550 {
551 EvictionReason::try_from(rj.reason).ok()
552 } else {
553 None
554 }
555 })
556 }
557 }
558
559 impl workflow_activation_job::Variant {
560 pub fn is_local_activity_resolution(&self) -> bool {
561 matches!(self, workflow_activation_job::Variant::ResolveActivity(ra) if ra.is_local)
562 }
563 }
564
565 impl Display for EvictionReason {
566 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
567 write!(f, "{self:?}")
568 }
569 }
570
571 impl From<EvictionReason> for WorkflowTaskFailedCause {
572 fn from(value: EvictionReason) -> Self {
573 match value {
574 EvictionReason::Nondeterminism => {
575 WorkflowTaskFailedCause::NonDeterministicError
576 }
577 _ => WorkflowTaskFailedCause::Unspecified,
578 }
579 }
580 }
581
582 impl Display for WorkflowActivation {
583 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
584 write!(f, "WorkflowActivation(")?;
585 write!(f, "run_id: {}, ", self.run_id)?;
586 write!(f, "is_replaying: {}, ", self.is_replaying)?;
587 write!(
588 f,
589 "jobs: {})",
590 self.jobs
591 .iter()
592 .map(ToString::to_string)
593 .collect::<Vec<_>>()
594 .as_slice()
595 .join(", ")
596 )
597 }
598 }
599
600 impl Display for WorkflowActivationJob {
601 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
602 match &self.variant {
603 None => write!(f, "empty"),
604 Some(v) => write!(f, "{v}"),
605 }
606 }
607 }
608
609 impl Display for workflow_activation_job::Variant {
610 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
611 match self {
612 workflow_activation_job::Variant::InitializeWorkflow(_) => {
613 write!(f, "InitializeWorkflow")
614 }
615 workflow_activation_job::Variant::FireTimer(t) => {
616 write!(f, "FireTimer({})", t.seq)
617 }
618 workflow_activation_job::Variant::UpdateRandomSeed(_) => {
619 write!(f, "UpdateRandomSeed")
620 }
621 workflow_activation_job::Variant::QueryWorkflow(_) => {
622 write!(f, "QueryWorkflow")
623 }
624 workflow_activation_job::Variant::CancelWorkflow(_) => {
625 write!(f, "CancelWorkflow")
626 }
627 workflow_activation_job::Variant::SignalWorkflow(_) => {
628 write!(f, "SignalWorkflow")
629 }
630 workflow_activation_job::Variant::ResolveActivity(r) => {
631 write!(
632 f,
633 "ResolveActivity({}, {})",
634 r.seq,
635 r.result
636 .as_ref()
637 .unwrap_or(&ActivityResolution { status: None })
638 )
639 }
640 workflow_activation_job::Variant::NotifyHasPatch(_) => {
641 write!(f, "NotifyHasPatch")
642 }
643 workflow_activation_job::Variant::ResolveChildWorkflowExecutionStart(_) => {
644 write!(f, "ResolveChildWorkflowExecutionStart")
645 }
646 workflow_activation_job::Variant::ResolveChildWorkflowExecution(_) => {
647 write!(f, "ResolveChildWorkflowExecution")
648 }
649 workflow_activation_job::Variant::ResolveSignalExternalWorkflow(_) => {
650 write!(f, "ResolveSignalExternalWorkflow")
651 }
652 workflow_activation_job::Variant::RemoveFromCache(_) => {
653 write!(f, "RemoveFromCache")
654 }
655 workflow_activation_job::Variant::ResolveRequestCancelExternalWorkflow(_) => {
656 write!(f, "ResolveRequestCancelExternalWorkflow")
657 }
658 workflow_activation_job::Variant::DoUpdate(u) => {
659 write!(f, "DoUpdate({})", u.id)
660 }
661 workflow_activation_job::Variant::ResolveNexusOperationStart(_) => {
662 write!(f, "ResolveNexusOperationStart")
663 }
664 workflow_activation_job::Variant::ResolveNexusOperation(_) => {
665 write!(f, "ResolveNexusOperation")
666 }
667 }
668 }
669 }
670
671 impl Display for ActivityResolution {
672 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
673 match self.status {
674 None => {
675 write!(f, "None")
676 }
677 Some(activity_resolution::Status::Failed(_)) => {
678 write!(f, "Failed")
679 }
680 Some(activity_resolution::Status::Completed(_)) => {
681 write!(f, "Completed")
682 }
683 Some(activity_resolution::Status::Cancelled(_)) => {
684 write!(f, "Cancelled")
685 }
686 Some(activity_resolution::Status::Backoff(_)) => {
687 write!(f, "Backoff")
688 }
689 }
690 }
691 }
692
693 impl Display for QueryWorkflow {
694 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
695 write!(
696 f,
697 "QueryWorkflow(id: {}, type: {})",
698 self.query_id, self.query_type
699 )
700 }
701 }
702
703 impl From<WorkflowExecutionSignaledEventAttributes> for SignalWorkflow {
704 fn from(a: WorkflowExecutionSignaledEventAttributes) -> Self {
705 Self {
706 signal_name: a.signal_name,
707 input: Vec::from_payloads(a.input),
708 identity: a.identity,
709 headers: a.header.map(Into::into).unwrap_or_default(),
710 }
711 }
712 }
713
714 impl From<WorkflowExecutionCancelRequestedEventAttributes> for CancelWorkflow {
715 fn from(a: WorkflowExecutionCancelRequestedEventAttributes) -> Self {
716 Self { reason: a.cause }
717 }
718 }
719
720 pub fn start_workflow_from_attribs(
722 attrs: WorkflowExecutionStartedEventAttributes,
723 workflow_id: String,
724 randomness_seed: u64,
725 start_time: Timestamp,
726 ) -> InitializeWorkflow {
727 InitializeWorkflow {
728 workflow_type: attrs.workflow_type.map(|wt| wt.name).unwrap_or_default(),
729 workflow_id,
730 arguments: Vec::from_payloads(attrs.input),
731 randomness_seed,
732 headers: attrs.header.unwrap_or_default().fields,
733 identity: attrs.identity,
734 parent_workflow_info: attrs.parent_workflow_execution.map(|pe| {
735 NamespacedWorkflowExecution {
736 namespace: attrs.parent_workflow_namespace,
737 run_id: pe.run_id,
738 workflow_id: pe.workflow_id,
739 }
740 }),
741 workflow_execution_timeout: attrs.workflow_execution_timeout,
742 workflow_run_timeout: attrs.workflow_run_timeout,
743 workflow_task_timeout: attrs.workflow_task_timeout,
744 continued_from_execution_run_id: attrs.continued_execution_run_id,
745 continued_initiator: attrs.initiator,
746 continued_failure: attrs.continued_failure,
747 last_completion_result: attrs.last_completion_result,
748 first_execution_run_id: attrs.first_execution_run_id,
749 retry_policy: attrs.retry_policy.map(fix_retry_policy),
750 attempt: attrs.attempt,
751 cron_schedule: attrs.cron_schedule,
752 workflow_execution_expiration_time: attrs.workflow_execution_expiration_time,
753 cron_schedule_to_schedule_interval: attrs.first_workflow_task_backoff,
754 memo: attrs.memo,
755 search_attributes: attrs.search_attributes,
756 start_time: Some(start_time),
757 root_workflow: attrs.root_workflow_execution,
758 priority: attrs.priority,
759 }
760 }
761 }
762
763 pub mod workflow_completion {
764 use crate::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure};
765 tonic::include_proto!("coresdk.workflow_completion");
766
767 impl workflow_activation_completion::Status {
768 pub const fn is_success(&self) -> bool {
769 match &self {
770 Self::Successful(_) => true,
771 Self::Failed(_) => false,
772 }
773 }
774 }
775
776 impl From<failure::v1::Failure> for Failure {
777 fn from(f: failure::v1::Failure) -> Self {
778 Failure {
779 failure: Some(f),
780 force_cause: WorkflowTaskFailedCause::Unspecified as i32,
781 }
782 }
783 }
784 }
785
786 pub mod child_workflow {
787 tonic::include_proto!("coresdk.child_workflow");
788 }
789
790 pub mod nexus {
791 use crate::temporal::api::workflowservice::v1::PollNexusTaskQueueResponse;
792 use std::fmt::{Display, Formatter};
793
794 tonic::include_proto!("coresdk.nexus");
795
796 impl NexusTask {
797 pub fn unwrap_task(self) -> PollNexusTaskQueueResponse {
799 if let Some(nexus_task::Variant::Task(t)) = self.variant {
800 return t;
801 }
802 panic!("Nexus task did not contain a server task");
803 }
804
805 pub fn task_token(&self) -> &[u8] {
807 match &self.variant {
808 Some(nexus_task::Variant::Task(t)) => t.task_token.as_slice(),
809 Some(nexus_task::Variant::CancelTask(c)) => c.task_token.as_slice(),
810 None => panic!("Nexus task did not contain a task token"),
811 }
812 }
813 }
814
815 impl Display for nexus_task_completion::Status {
816 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
817 write!(f, "NexusTaskCompletion(")?;
818 match self {
819 nexus_task_completion::Status::Completed(c) => {
820 write!(f, "{c}")
821 }
822 nexus_task_completion::Status::Error(e) => {
823 write!(f, "{e}")
824 }
825 nexus_task_completion::Status::AckCancel(_) => {
826 write!(f, "AckCancel")
827 }
828 }?;
829 write!(f, ")")
830 }
831 }
832 }
833
834 pub mod workflow_commands {
835 tonic::include_proto!("coresdk.workflow_commands");
836
837 use crate::temporal::api::{common::v1::Payloads, enums::v1::QueryResultType};
838 use std::fmt::{Display, Formatter};
839
840 impl Display for WorkflowCommand {
841 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
842 match &self.variant {
843 None => write!(f, "Empty"),
844 Some(v) => write!(f, "{v}"),
845 }
846 }
847 }
848
849 impl Display for StartTimer {
850 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
851 write!(f, "StartTimer({})", self.seq)
852 }
853 }
854
855 impl Display for ScheduleActivity {
856 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
857 write!(f, "ScheduleActivity({}, {})", self.seq, self.activity_type)
858 }
859 }
860
861 impl Display for ScheduleLocalActivity {
862 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
863 write!(
864 f,
865 "ScheduleLocalActivity({}, {})",
866 self.seq, self.activity_type
867 )
868 }
869 }
870
871 impl Display for QueryResult {
872 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
873 write!(f, "RespondToQuery({})", self.query_id)
874 }
875 }
876
877 impl Display for RequestCancelActivity {
878 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
879 write!(f, "RequestCancelActivity({})", self.seq)
880 }
881 }
882
883 impl Display for RequestCancelLocalActivity {
884 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
885 write!(f, "RequestCancelLocalActivity({})", self.seq)
886 }
887 }
888
889 impl Display for CancelTimer {
890 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
891 write!(f, "CancelTimer({})", self.seq)
892 }
893 }
894
895 impl Display for CompleteWorkflowExecution {
896 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
897 write!(f, "CompleteWorkflowExecution")
898 }
899 }
900
901 impl Display for FailWorkflowExecution {
902 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
903 write!(f, "FailWorkflowExecution")
904 }
905 }
906
907 impl Display for ContinueAsNewWorkflowExecution {
908 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
909 write!(f, "ContinueAsNewWorkflowExecution")
910 }
911 }
912
913 impl Display for CancelWorkflowExecution {
914 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
915 write!(f, "CancelWorkflowExecution")
916 }
917 }
918
919 impl Display for SetPatchMarker {
920 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
921 write!(f, "SetPatchMarker({})", self.patch_id)
922 }
923 }
924
925 impl Display for StartChildWorkflowExecution {
926 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
927 write!(
928 f,
929 "StartChildWorkflowExecution({}, {})",
930 self.seq, self.workflow_type
931 )
932 }
933 }
934
935 impl Display for RequestCancelExternalWorkflowExecution {
936 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
937 write!(f, "RequestCancelExternalWorkflowExecution({})", self.seq)
938 }
939 }
940
941 impl Display for UpsertWorkflowSearchAttributes {
942 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
943 write!(
944 f,
945 "UpsertWorkflowSearchAttributes({:?})",
946 self.search_attributes.keys()
947 )
948 }
949 }
950
951 impl Display for SignalExternalWorkflowExecution {
952 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
953 write!(f, "SignalExternalWorkflowExecution({})", self.seq)
954 }
955 }
956
957 impl Display for CancelSignalWorkflow {
958 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
959 write!(f, "CancelSignalWorkflow({})", self.seq)
960 }
961 }
962
963 impl Display for CancelChildWorkflowExecution {
964 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
965 write!(
966 f,
967 "CancelChildWorkflowExecution({})",
968 self.child_workflow_seq
969 )
970 }
971 }
972
973 impl Display for ModifyWorkflowProperties {
974 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
975 write!(
976 f,
977 "ModifyWorkflowProperties(upserted memo keys: {:?})",
978 self.upserted_memo.as_ref().map(|m| m.fields.keys())
979 )
980 }
981 }
982
983 impl Display for UpdateResponse {
984 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
985 write!(
986 f,
987 "UpdateResponse(protocol_instance_id: {}, response: {:?})",
988 self.protocol_instance_id, self.response
989 )
990 }
991 }
992
993 impl Display for ScheduleNexusOperation {
994 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
995 write!(f, "ScheduleNexusOperation({})", self.seq)
996 }
997 }
998
999 impl Display for RequestCancelNexusOperation {
1000 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1001 write!(f, "RequestCancelNexusOperation({})", self.seq)
1002 }
1003 }
1004
1005 impl QueryResult {
1006 pub fn into_components(self) -> (String, QueryResultType, Option<Payloads>, String) {
1008 match self {
1009 QueryResult {
1010 variant: Some(query_result::Variant::Succeeded(qs)),
1011 query_id,
1012 } => (
1013 query_id,
1014 QueryResultType::Answered,
1015 qs.response.map(Into::into),
1016 "".to_string(),
1017 ),
1018 QueryResult {
1019 variant: Some(query_result::Variant::Failed(err)),
1020 query_id,
1021 } => (query_id, QueryResultType::Failed, None, err.message),
1022 QueryResult {
1023 variant: None,
1024 query_id,
1025 } => (
1026 query_id,
1027 QueryResultType::Failed,
1028 None,
1029 "Query response was empty".to_string(),
1030 ),
1031 }
1032 }
1033 }
1034 }
1035
1036 pub type HistoryEventId = i64;
1037
1038 impl From<workflow_activation_job::Variant> for WorkflowActivationJob {
1039 fn from(a: workflow_activation_job::Variant) -> Self {
1040 Self { variant: Some(a) }
1041 }
1042 }
1043
1044 impl From<Vec<WorkflowCommand>> for workflow_completion::Success {
1045 fn from(v: Vec<WorkflowCommand>) -> Self {
1046 Self {
1047 commands: v,
1048 used_internal_flags: vec![],
1049 versioning_behavior: VersioningBehavior::Unspecified.into(),
1050 }
1051 }
1052 }
1053
1054 impl From<workflow_command::Variant> for WorkflowCommand {
1055 fn from(v: workflow_command::Variant) -> Self {
1056 Self {
1057 variant: Some(v),
1058 user_metadata: None,
1059 }
1060 }
1061 }
1062
1063 impl workflow_completion::Success {
1064 pub fn from_variants(cmds: Vec<Variant>) -> Self {
1065 let cmds: Vec<_> = cmds.into_iter().map(|c| c.into()).collect();
1066 cmds.into()
1067 }
1068 }
1069
1070 impl WorkflowActivationCompletion {
1071 pub fn empty(run_id: impl Into<String>) -> Self {
1073 let success = workflow_completion::Success::from_variants(vec![]);
1074 Self {
1075 run_id: run_id.into(),
1076 status: Some(workflow_activation_completion::Status::Successful(success)),
1077 }
1078 }
1079
1080 pub fn from_cmds(run_id: impl Into<String>, cmds: Vec<workflow_command::Variant>) -> Self {
1082 let success = workflow_completion::Success::from_variants(cmds);
1083 Self {
1084 run_id: run_id.into(),
1085 status: Some(workflow_activation_completion::Status::Successful(success)),
1086 }
1087 }
1088
1089 pub fn from_cmd(run_id: impl Into<String>, cmd: workflow_command::Variant) -> Self {
1091 let success = workflow_completion::Success::from_variants(vec![cmd]);
1092 Self {
1093 run_id: run_id.into(),
1094 status: Some(workflow_activation_completion::Status::Successful(success)),
1095 }
1096 }
1097
1098 pub fn fail(
1099 run_id: impl Into<String>,
1100 failure: Failure,
1101 cause: Option<WorkflowTaskFailedCause>,
1102 ) -> Self {
1103 Self {
1104 run_id: run_id.into(),
1105 status: Some(workflow_activation_completion::Status::Failed(
1106 workflow_completion::Failure {
1107 failure: Some(failure),
1108 force_cause: cause.unwrap_or(WorkflowTaskFailedCause::Unspecified) as i32,
1109 },
1110 )),
1111 }
1112 }
1113
1114 pub fn has_execution_ending(&self) -> bool {
1117 self.has_complete_workflow_execution()
1118 || self.has_fail_execution()
1119 || self.has_continue_as_new()
1120 || self.has_cancel_workflow_execution()
1121 }
1122
1123 pub fn has_fail_execution(&self) -> bool {
1125 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1126 return s.commands.iter().any(|wfc| {
1127 matches!(
1128 wfc,
1129 WorkflowCommand {
1130 variant: Some(workflow_command::Variant::FailWorkflowExecution(_)),
1131 ..
1132 }
1133 )
1134 });
1135 }
1136 false
1137 }
1138
1139 pub fn has_cancel_workflow_execution(&self) -> bool {
1141 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1142 return s.commands.iter().any(|wfc| {
1143 matches!(
1144 wfc,
1145 WorkflowCommand {
1146 variant: Some(workflow_command::Variant::CancelWorkflowExecution(_)),
1147 ..
1148 }
1149 )
1150 });
1151 }
1152 false
1153 }
1154
1155 pub fn has_continue_as_new(&self) -> bool {
1157 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1158 return s.commands.iter().any(|wfc| {
1159 matches!(
1160 wfc,
1161 WorkflowCommand {
1162 variant: Some(
1163 workflow_command::Variant::ContinueAsNewWorkflowExecution(_)
1164 ),
1165 ..
1166 }
1167 )
1168 });
1169 }
1170 false
1171 }
1172
1173 pub fn has_complete_workflow_execution(&self) -> bool {
1175 self.complete_workflow_execution_value().is_some()
1176 }
1177
1178 pub fn complete_workflow_execution_value(&self) -> Option<&Payload> {
1180 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1181 s.commands.iter().find_map(|wfc| match wfc {
1182 WorkflowCommand {
1183 variant: Some(workflow_command::Variant::CompleteWorkflowExecution(v)),
1184 ..
1185 } => v.result.as_ref(),
1186 _ => None,
1187 })
1188 } else {
1189 None
1190 }
1191 }
1192
1193 pub fn is_empty(&self) -> bool {
1195 if let Some(workflow_activation_completion::Status::Successful(s)) = &self.status {
1196 return s.commands.is_empty();
1197 }
1198 false
1199 }
1200
1201 pub fn add_internal_flags(&mut self, patch: u32) {
1202 if let Some(workflow_activation_completion::Status::Successful(s)) = &mut self.status {
1203 s.used_internal_flags.push(patch);
1204 }
1205 }
1206 }
1207
1208 pub trait IntoCompletion {
1210 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion;
1212 }
1213
1214 impl IntoCompletion for workflow_command::Variant {
1215 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1216 WorkflowActivationCompletion::from_cmd(run_id, self)
1217 }
1218 }
1219
1220 impl<I, V> IntoCompletion for I
1221 where
1222 I: IntoIterator<Item = V>,
1223 V: Into<WorkflowCommand>,
1224 {
1225 fn into_completion(self, run_id: String) -> WorkflowActivationCompletion {
1226 let success = self.into_iter().map(Into::into).collect::<Vec<_>>().into();
1227 WorkflowActivationCompletion {
1228 run_id,
1229 status: Some(workflow_activation_completion::Status::Successful(success)),
1230 }
1231 }
1232 }
1233
1234 impl Display for WorkflowActivationCompletion {
1235 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1236 write!(
1237 f,
1238 "WorkflowActivationCompletion(run_id: {}, status: ",
1239 &self.run_id
1240 )?;
1241 match &self.status {
1242 None => write!(f, "empty")?,
1243 Some(s) => write!(f, "{s}")?,
1244 };
1245 write!(f, ")")
1246 }
1247 }
1248
1249 impl Display for workflow_activation_completion::Status {
1250 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1251 match self {
1252 workflow_activation_completion::Status::Successful(
1253 workflow_completion::Success { commands, .. },
1254 ) => {
1255 write!(f, "Success(")?;
1256 let mut written = 0;
1257 for c in commands {
1258 write!(f, "{c} ")?;
1259 written += 1;
1260 if written >= 10 && written < commands.len() {
1261 write!(f, "... {} more", commands.len() - written)?;
1262 break;
1263 }
1264 }
1265 write!(f, ")")
1266 }
1267 workflow_activation_completion::Status::Failed(_) => {
1268 write!(f, "Failed")
1269 }
1270 }
1271 }
1272 }
1273
1274 impl ActivityTask {
1275 pub fn start_from_poll_resp(r: PollActivityTaskQueueResponse) -> Self {
1276 let (workflow_id, run_id) = r
1277 .workflow_execution
1278 .map(|we| (we.workflow_id, we.run_id))
1279 .unwrap_or_default();
1280 Self {
1281 task_token: r.task_token,
1282 variant: Some(activity_task::activity_task::Variant::Start(
1283 activity_task::Start {
1284 workflow_namespace: r.workflow_namespace,
1285 workflow_type: r.workflow_type.map_or_else(|| "".to_string(), |wt| wt.name),
1286 workflow_execution: Some(WorkflowExecution {
1287 workflow_id,
1288 run_id,
1289 }),
1290 activity_id: r.activity_id,
1291 activity_type: r.activity_type.map_or_else(|| "".to_string(), |at| at.name),
1292 header_fields: r.header.map(Into::into).unwrap_or_default(),
1293 input: Vec::from_payloads(r.input),
1294 heartbeat_details: Vec::from_payloads(r.heartbeat_details),
1295 scheduled_time: r.scheduled_time,
1296 current_attempt_scheduled_time: r.current_attempt_scheduled_time,
1297 started_time: r.started_time,
1298 attempt: r.attempt as u32,
1299 schedule_to_close_timeout: r.schedule_to_close_timeout,
1300 start_to_close_timeout: r.start_to_close_timeout,
1301 heartbeat_timeout: r.heartbeat_timeout,
1302 retry_policy: r.retry_policy.map(fix_retry_policy),
1303 priority: r.priority,
1304 is_local: false,
1305 },
1306 )),
1307 }
1308 }
1309 }
1310
1311 impl Failure {
1312 pub fn is_timeout(&self) -> Option<crate::temporal::api::enums::v1::TimeoutType> {
1313 match &self.failure_info {
1314 Some(FailureInfo::TimeoutFailureInfo(ti)) => Some(ti.timeout_type()),
1315 _ => None,
1316 }
1317 }
1318
1319 pub fn application_failure(message: String, non_retryable: bool) -> Self {
1320 Self {
1321 message,
1322 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1323 ApplicationFailureInfo {
1324 non_retryable,
1325 ..Default::default()
1326 },
1327 )),
1328 ..Default::default()
1329 }
1330 }
1331
1332 pub fn application_failure_from_error(ae: anyhow::Error, non_retryable: bool) -> Self {
1333 Self {
1334 failure_info: Some(FailureInfo::ApplicationFailureInfo(
1335 ApplicationFailureInfo {
1336 non_retryable,
1337 ..Default::default()
1338 },
1339 )),
1340 ..ae.chain()
1341 .rfold(None, |cause, e| {
1342 Some(Self {
1343 message: e.to_string(),
1344 cause: cause.map(Box::new),
1345 ..Default::default()
1346 })
1347 })
1348 .unwrap_or_default()
1349 }
1350 }
1351
1352 pub fn timeout(timeout_type: TimeoutType) -> Self {
1353 Self {
1354 message: "Activity timed out".to_string(),
1355 cause: Some(Box::new(Failure {
1356 message: "Activity timed out".to_string(),
1357 failure_info: Some(FailureInfo::TimeoutFailureInfo(TimeoutFailureInfo {
1358 timeout_type: timeout_type.into(),
1359 ..Default::default()
1360 })),
1361 ..Default::default()
1362 })),
1363 failure_info: Some(FailureInfo::ActivityFailureInfo(
1364 ActivityFailureInfo::default(),
1365 )),
1366 ..Default::default()
1367 }
1368 }
1369
1370 pub fn maybe_application_failure(&self) -> Option<&ApplicationFailureInfo> {
1372 if let Failure {
1373 failure_info: Some(FailureInfo::ApplicationFailureInfo(f)),
1374 ..
1375 } = self
1376 {
1377 Some(f)
1378 } else {
1379 None
1380 }
1381 }
1382
1383 pub fn is_benign_application_failure(&self) -> bool {
1385 self.maybe_application_failure()
1386 .is_some_and(|app_info| app_info.category() == ApplicationErrorCategory::Benign)
1387 }
1388 }
1389
1390 impl Display for Failure {
1391 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1392 write!(f, "Failure({}, ", self.message)?;
1393 match self.failure_info.as_ref() {
1394 None => write!(f, "missing info")?,
1395 Some(FailureInfo::TimeoutFailureInfo(v)) => {
1396 write!(f, "Timeout: {:?}", v.timeout_type())?;
1397 }
1398 Some(FailureInfo::ApplicationFailureInfo(v)) => {
1399 write!(f, "Application Failure: {}", v.r#type)?;
1400 }
1401 Some(FailureInfo::CanceledFailureInfo(_)) => {
1402 write!(f, "Cancelled")?;
1403 }
1404 Some(FailureInfo::TerminatedFailureInfo(_)) => {
1405 write!(f, "Terminated")?;
1406 }
1407 Some(FailureInfo::ServerFailureInfo(_)) => {
1408 write!(f, "Server Failure")?;
1409 }
1410 Some(FailureInfo::ResetWorkflowFailureInfo(_)) => {
1411 write!(f, "Reset Workflow")?;
1412 }
1413 Some(FailureInfo::ActivityFailureInfo(v)) => {
1414 write!(
1415 f,
1416 "Activity Failure: scheduled_event_id: {}",
1417 v.scheduled_event_id
1418 )?;
1419 }
1420 Some(FailureInfo::ChildWorkflowExecutionFailureInfo(v)) => {
1421 write!(
1422 f,
1423 "Child Workflow: started_event_id: {}",
1424 v.started_event_id
1425 )?;
1426 }
1427 Some(FailureInfo::NexusOperationExecutionFailureInfo(v)) => {
1428 write!(
1429 f,
1430 "Nexus Operation Failure: scheduled_event_id: {}",
1431 v.scheduled_event_id
1432 )?;
1433 }
1434 Some(FailureInfo::NexusHandlerFailureInfo(v)) => {
1435 write!(f, "Nexus Handler Failure: {}", v.r#type)?;
1436 }
1437 }
1438 write!(f, ")")
1439 }
1440 }
1441
1442 impl From<&str> for Failure {
1443 fn from(v: &str) -> Self {
1444 Failure::application_failure(v.to_string(), false)
1445 }
1446 }
1447
1448 impl From<String> for Failure {
1449 fn from(v: String) -> Self {
1450 Failure::application_failure(v, false)
1451 }
1452 }
1453
1454 impl From<anyhow::Error> for Failure {
1455 fn from(ae: anyhow::Error) -> Self {
1456 Failure::application_failure_from_error(ae, false)
1457 }
1458 }
1459
1460 pub trait FromPayloadsExt {
1461 fn from_payloads(p: Option<Payloads>) -> Self;
1462 }
1463 impl<T> FromPayloadsExt for T
1464 where
1465 T: FromIterator<Payload>,
1466 {
1467 fn from_payloads(p: Option<Payloads>) -> Self {
1468 match p {
1469 None => std::iter::empty().collect(),
1470 Some(p) => p.payloads.into_iter().collect(),
1471 }
1472 }
1473 }
1474
1475 pub trait IntoPayloadsExt {
1476 fn into_payloads(self) -> Option<Payloads>;
1477 }
1478 impl<T> IntoPayloadsExt for T
1479 where
1480 T: IntoIterator<Item = Payload>,
1481 {
1482 fn into_payloads(self) -> Option<Payloads> {
1483 let mut iterd = self.into_iter().peekable();
1484 if iterd.peek().is_none() {
1485 None
1486 } else {
1487 Some(Payloads {
1488 payloads: iterd.collect(),
1489 })
1490 }
1491 }
1492 }
1493
1494 impl From<Payload> for Payloads {
1495 fn from(p: Payload) -> Self {
1496 Self { payloads: vec![p] }
1497 }
1498 }
1499
1500 impl<T> From<T> for Payloads
1501 where
1502 T: AsRef<[u8]>,
1503 {
1504 fn from(v: T) -> Self {
1505 Self {
1506 payloads: vec![v.into()],
1507 }
1508 }
1509 }
1510
1511 #[derive(thiserror::Error, Debug)]
1512 pub enum PayloadDeserializeErr {
1513 #[error("This deserializer does not understand this payload")]
1516 DeserializerDoesNotHandle,
1517 #[error("Error during deserialization: {0}")]
1518 DeserializeErr(#[from] anyhow::Error),
1519 }
1520
1521 pub trait AsJsonPayloadExt {
1524 fn as_json_payload(&self) -> anyhow::Result<Payload>;
1525 }
1526 impl<T> AsJsonPayloadExt for T
1527 where
1528 T: Serialize,
1529 {
1530 fn as_json_payload(&self) -> anyhow::Result<Payload> {
1531 let as_json = serde_json::to_string(self)?;
1532 let mut metadata = HashMap::new();
1533 metadata.insert(
1534 ENCODING_PAYLOAD_KEY.to_string(),
1535 JSON_ENCODING_VAL.as_bytes().to_vec(),
1536 );
1537 Ok(Payload {
1538 metadata,
1539 data: as_json.into_bytes(),
1540 })
1541 }
1542 }
1543
1544 pub trait FromJsonPayloadExt: Sized {
1545 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr>;
1546 }
1547 impl<T> FromJsonPayloadExt for T
1548 where
1549 T: for<'de> Deserialize<'de>,
1550 {
1551 fn from_json_payload(payload: &Payload) -> Result<Self, PayloadDeserializeErr> {
1552 if !payload.is_json_payload() {
1553 return Err(PayloadDeserializeErr::DeserializerDoesNotHandle);
1554 }
1555 let payload_str = std::str::from_utf8(&payload.data).map_err(anyhow::Error::from)?;
1556 Ok(serde_json::from_str(payload_str).map_err(anyhow::Error::from)?)
1557 }
1558 }
1559
1560 #[derive(derive_more::Display, Debug)]
1562 pub enum PayloadsToPayloadError {
1563 MoreThanOnePayload,
1564 NoPayload,
1565 }
1566 impl TryFrom<Payloads> for Payload {
1567 type Error = PayloadsToPayloadError;
1568
1569 fn try_from(mut v: Payloads) -> Result<Self, Self::Error> {
1570 match v.payloads.pop() {
1571 None => Err(PayloadsToPayloadError::NoPayload),
1572 Some(p) => {
1573 if v.payloads.is_empty() {
1574 Ok(p)
1575 } else {
1576 Err(PayloadsToPayloadError::MoreThanOnePayload)
1577 }
1578 }
1579 }
1580 }
1581 }
1582
1583 fn fix_retry_policy(mut retry_policy: RetryPolicy) -> RetryPolicy {
1586 if retry_policy.initial_interval.is_none() {
1587 retry_policy.initial_interval = Default::default();
1588 }
1589 retry_policy
1590 }
1591}
1592
1593#[allow(
1595 clippy::all,
1596 missing_docs,
1597 rustdoc::broken_intra_doc_links,
1598 rustdoc::bare_urls
1599)]
1600pub mod temporal {
1602 pub mod api {
1603 pub mod activity {
1604 pub mod v1 {
1605 tonic::include_proto!("temporal.api.activity.v1");
1606 }
1607 }
1608 pub mod batch {
1609 pub mod v1 {
1610 tonic::include_proto!("temporal.api.batch.v1");
1611 }
1612 }
1613 pub mod command {
1614 pub mod v1 {
1615 tonic::include_proto!("temporal.api.command.v1");
1616
1617 use crate::{
1618 coresdk::{IntoPayloadsExt, workflow_commands},
1619 temporal::api::{
1620 common::v1::{ActivityType, WorkflowType},
1621 enums::v1::CommandType,
1622 },
1623 };
1624 use command::Attributes;
1625 use std::fmt::{Display, Formatter};
1626
1627 impl From<command::Attributes> for Command {
1628 fn from(c: command::Attributes) -> Self {
1629 match c {
1630 a @ Attributes::StartTimerCommandAttributes(_) => Self {
1631 command_type: CommandType::StartTimer as i32,
1632 attributes: Some(a),
1633 user_metadata: Default::default(),
1634 },
1635 a @ Attributes::CancelTimerCommandAttributes(_) => Self {
1636 command_type: CommandType::CancelTimer as i32,
1637 attributes: Some(a),
1638 user_metadata: Default::default(),
1639 },
1640 a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => Self {
1641 command_type: CommandType::CompleteWorkflowExecution as i32,
1642 attributes: Some(a),
1643 user_metadata: Default::default(),
1644 },
1645 a @ Attributes::FailWorkflowExecutionCommandAttributes(_) => Self {
1646 command_type: CommandType::FailWorkflowExecution as i32,
1647 attributes: Some(a),
1648 user_metadata: Default::default(),
1649 },
1650 a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self {
1651 command_type: CommandType::ScheduleActivityTask as i32,
1652 attributes: Some(a),
1653 user_metadata: Default::default(),
1654 },
1655 a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => Self {
1656 command_type: CommandType::RequestCancelActivityTask as i32,
1657 attributes: Some(a),
1658 user_metadata: Default::default(),
1659 },
1660 a @ Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1661 Self {
1662 command_type: CommandType::ContinueAsNewWorkflowExecution
1663 as i32,
1664 attributes: Some(a),
1665 user_metadata: Default::default(),
1666 }
1667 }
1668 a @ Attributes::CancelWorkflowExecutionCommandAttributes(_) => Self {
1669 command_type: CommandType::CancelWorkflowExecution as i32,
1670 attributes: Some(a),
1671 user_metadata: Default::default(),
1672 },
1673 a @ Attributes::RecordMarkerCommandAttributes(_) => Self {
1674 command_type: CommandType::RecordMarker as i32,
1675 attributes: Some(a),
1676 user_metadata: Default::default(),
1677 },
1678 a @ Attributes::ProtocolMessageCommandAttributes(_) => Self {
1679 command_type: CommandType::ProtocolMessage as i32,
1680 attributes: Some(a),
1681 user_metadata: Default::default(),
1682 },
1683 a @ Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1684 Self {
1685 command_type: CommandType::RequestCancelNexusOperation as i32,
1686 attributes: Some(a),
1687 user_metadata: Default::default(),
1688 }
1689 }
1690 _ => unimplemented!(),
1691 }
1692 }
1693 }
1694
1695 impl Display for Command {
1696 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1697 let ct = CommandType::try_from(self.command_type)
1698 .unwrap_or(CommandType::Unspecified);
1699 write!(f, "{:?}", ct)
1700 }
1701 }
1702
1703 pub trait CommandAttributesExt {
1704 fn as_type(&self) -> CommandType;
1705 }
1706
1707 impl CommandAttributesExt for command::Attributes {
1708 fn as_type(&self) -> CommandType {
1709 match self {
1710 Attributes::ScheduleActivityTaskCommandAttributes(_) => {
1711 CommandType::ScheduleActivityTask
1712 }
1713 Attributes::StartTimerCommandAttributes(_) => CommandType::StartTimer,
1714 Attributes::CompleteWorkflowExecutionCommandAttributes(_) => {
1715 CommandType::CompleteWorkflowExecution
1716 }
1717 Attributes::FailWorkflowExecutionCommandAttributes(_) => {
1718 CommandType::FailWorkflowExecution
1719 }
1720 Attributes::RequestCancelActivityTaskCommandAttributes(_) => {
1721 CommandType::RequestCancelActivityTask
1722 }
1723 Attributes::CancelTimerCommandAttributes(_) => CommandType::CancelTimer,
1724 Attributes::CancelWorkflowExecutionCommandAttributes(_) => {
1725 CommandType::CancelWorkflowExecution
1726 }
1727 Attributes::RequestCancelExternalWorkflowExecutionCommandAttributes(
1728 _,
1729 ) => CommandType::RequestCancelExternalWorkflowExecution,
1730 Attributes::RecordMarkerCommandAttributes(_) => {
1731 CommandType::RecordMarker
1732 }
1733 Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => {
1734 CommandType::ContinueAsNewWorkflowExecution
1735 }
1736 Attributes::StartChildWorkflowExecutionCommandAttributes(_) => {
1737 CommandType::StartChildWorkflowExecution
1738 }
1739 Attributes::SignalExternalWorkflowExecutionCommandAttributes(_) => {
1740 CommandType::SignalExternalWorkflowExecution
1741 }
1742 Attributes::UpsertWorkflowSearchAttributesCommandAttributes(_) => {
1743 CommandType::UpsertWorkflowSearchAttributes
1744 }
1745 Attributes::ProtocolMessageCommandAttributes(_) => {
1746 CommandType::ProtocolMessage
1747 }
1748 Attributes::ModifyWorkflowPropertiesCommandAttributes(_) => {
1749 CommandType::ModifyWorkflowProperties
1750 }
1751 Attributes::ScheduleNexusOperationCommandAttributes(_) => {
1752 CommandType::ScheduleNexusOperation
1753 }
1754 Attributes::RequestCancelNexusOperationCommandAttributes(_) => {
1755 CommandType::RequestCancelNexusOperation
1756 }
1757 }
1758 }
1759 }
1760
1761 impl Display for command::Attributes {
1762 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1763 write!(f, "{:?}", self.as_type())
1764 }
1765 }
1766
1767 impl From<workflow_commands::StartTimer> for command::Attributes {
1768 fn from(s: workflow_commands::StartTimer) -> Self {
1769 Self::StartTimerCommandAttributes(StartTimerCommandAttributes {
1770 timer_id: s.seq.to_string(),
1771 start_to_fire_timeout: s.start_to_fire_timeout,
1772 })
1773 }
1774 }
1775
1776 impl From<workflow_commands::UpsertWorkflowSearchAttributes> for command::Attributes {
1777 fn from(s: workflow_commands::UpsertWorkflowSearchAttributes) -> Self {
1778 Self::UpsertWorkflowSearchAttributesCommandAttributes(
1779 UpsertWorkflowSearchAttributesCommandAttributes {
1780 search_attributes: Some(s.search_attributes.into()),
1781 },
1782 )
1783 }
1784 }
1785
1786 impl From<workflow_commands::ModifyWorkflowProperties> for command::Attributes {
1787 fn from(s: workflow_commands::ModifyWorkflowProperties) -> Self {
1788 Self::ModifyWorkflowPropertiesCommandAttributes(
1789 ModifyWorkflowPropertiesCommandAttributes {
1790 upserted_memo: s.upserted_memo.map(Into::into),
1791 },
1792 )
1793 }
1794 }
1795
1796 impl From<workflow_commands::CancelTimer> for command::Attributes {
1797 fn from(s: workflow_commands::CancelTimer) -> Self {
1798 Self::CancelTimerCommandAttributes(CancelTimerCommandAttributes {
1799 timer_id: s.seq.to_string(),
1800 })
1801 }
1802 }
1803
1804 pub fn schedule_activity_cmd_to_api(
1805 s: workflow_commands::ScheduleActivity,
1806 use_workflow_build_id: bool,
1807 ) -> command::Attributes {
1808 command::Attributes::ScheduleActivityTaskCommandAttributes(
1809 ScheduleActivityTaskCommandAttributes {
1810 activity_id: s.activity_id,
1811 activity_type: Some(ActivityType {
1812 name: s.activity_type,
1813 }),
1814 task_queue: Some(s.task_queue.into()),
1815 header: Some(s.headers.into()),
1816 input: s.arguments.into_payloads(),
1817 schedule_to_close_timeout: s.schedule_to_close_timeout,
1818 schedule_to_start_timeout: s.schedule_to_start_timeout,
1819 start_to_close_timeout: s.start_to_close_timeout,
1820 heartbeat_timeout: s.heartbeat_timeout,
1821 retry_policy: s.retry_policy.map(Into::into),
1822 request_eager_execution: !s.do_not_eagerly_execute,
1823 use_workflow_build_id,
1824 priority: s.priority,
1825 },
1826 )
1827 }
1828
1829 #[allow(deprecated)]
1830 pub fn start_child_workflow_cmd_to_api(
1831 s: workflow_commands::StartChildWorkflowExecution,
1832 inherit_build_id: bool,
1833 ) -> command::Attributes {
1834 command::Attributes::StartChildWorkflowExecutionCommandAttributes(
1835 StartChildWorkflowExecutionCommandAttributes {
1836 workflow_id: s.workflow_id,
1837 workflow_type: Some(WorkflowType {
1838 name: s.workflow_type,
1839 }),
1840 control: "".into(),
1841 namespace: s.namespace,
1842 task_queue: Some(s.task_queue.into()),
1843 header: Some(s.headers.into()),
1844 memo: Some(s.memo.into()),
1845 search_attributes: Some(s.search_attributes.into()),
1846 input: s.input.into_payloads(),
1847 workflow_id_reuse_policy: s.workflow_id_reuse_policy,
1848 workflow_execution_timeout: s.workflow_execution_timeout,
1849 workflow_run_timeout: s.workflow_run_timeout,
1850 workflow_task_timeout: s.workflow_task_timeout,
1851 retry_policy: s.retry_policy.map(Into::into),
1852 cron_schedule: s.cron_schedule.clone(),
1853 parent_close_policy: s.parent_close_policy,
1854 inherit_build_id,
1855 priority: s.priority,
1856 },
1857 )
1858 }
1859
1860 impl From<workflow_commands::CompleteWorkflowExecution> for command::Attributes {
1861 fn from(c: workflow_commands::CompleteWorkflowExecution) -> Self {
1862 Self::CompleteWorkflowExecutionCommandAttributes(
1863 CompleteWorkflowExecutionCommandAttributes {
1864 result: c.result.map(Into::into),
1865 },
1866 )
1867 }
1868 }
1869
1870 impl From<workflow_commands::FailWorkflowExecution> for command::Attributes {
1871 fn from(c: workflow_commands::FailWorkflowExecution) -> Self {
1872 Self::FailWorkflowExecutionCommandAttributes(
1873 FailWorkflowExecutionCommandAttributes {
1874 failure: c.failure.map(Into::into),
1875 },
1876 )
1877 }
1878 }
1879
1880 #[allow(deprecated)]
1881 pub fn continue_as_new_cmd_to_api(
1882 c: workflow_commands::ContinueAsNewWorkflowExecution,
1883 inherit_build_id: bool,
1884 ) -> command::Attributes {
1885 command::Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(
1886 ContinueAsNewWorkflowExecutionCommandAttributes {
1887 workflow_type: Some(c.workflow_type.into()),
1888 task_queue: Some(c.task_queue.into()),
1889 input: c.arguments.into_payloads(),
1890 workflow_run_timeout: c.workflow_run_timeout,
1891 workflow_task_timeout: c.workflow_task_timeout,
1892 memo: if c.memo.is_empty() {
1893 None
1894 } else {
1895 Some(c.memo.into())
1896 },
1897 header: if c.headers.is_empty() {
1898 None
1899 } else {
1900 Some(c.headers.into())
1901 },
1902 retry_policy: c.retry_policy,
1903 search_attributes: if c.search_attributes.is_empty() {
1904 None
1905 } else {
1906 Some(c.search_attributes.into())
1907 },
1908 inherit_build_id,
1909 ..Default::default()
1910 },
1911 )
1912 }
1913
1914 impl From<workflow_commands::CancelWorkflowExecution> for command::Attributes {
1915 fn from(_c: workflow_commands::CancelWorkflowExecution) -> Self {
1916 Self::CancelWorkflowExecutionCommandAttributes(
1917 CancelWorkflowExecutionCommandAttributes { details: None },
1918 )
1919 }
1920 }
1921
1922 impl From<workflow_commands::ScheduleNexusOperation> for command::Attributes {
1923 fn from(c: workflow_commands::ScheduleNexusOperation) -> Self {
1924 Self::ScheduleNexusOperationCommandAttributes(
1925 ScheduleNexusOperationCommandAttributes {
1926 endpoint: c.endpoint,
1927 service: c.service,
1928 operation: c.operation,
1929 input: c.input,
1930 schedule_to_close_timeout: c.schedule_to_close_timeout,
1931 nexus_header: c.nexus_header,
1932 },
1933 )
1934 }
1935 }
1936 }
1937 }
1938 pub mod cloud {
1939 pub mod account {
1940 pub mod v1 {
1941 tonic::include_proto!("temporal.api.cloud.account.v1");
1942 }
1943 }
1944 pub mod cloudservice {
1945 pub mod v1 {
1946 tonic::include_proto!("temporal.api.cloud.cloudservice.v1");
1947 }
1948 }
1949 pub mod connectivityrule {
1950 pub mod v1 {
1951 tonic::include_proto!("temporal.api.cloud.connectivityrule.v1");
1952 }
1953 }
1954 pub mod identity {
1955 pub mod v1 {
1956 tonic::include_proto!("temporal.api.cloud.identity.v1");
1957 }
1958 }
1959 pub mod namespace {
1960 pub mod v1 {
1961 tonic::include_proto!("temporal.api.cloud.namespace.v1");
1962 }
1963 }
1964 pub mod nexus {
1965 pub mod v1 {
1966 tonic::include_proto!("temporal.api.cloud.nexus.v1");
1967 }
1968 }
1969 pub mod operation {
1970 pub mod v1 {
1971 tonic::include_proto!("temporal.api.cloud.operation.v1");
1972 }
1973 }
1974 pub mod region {
1975 pub mod v1 {
1976 tonic::include_proto!("temporal.api.cloud.region.v1");
1977 }
1978 }
1979 pub mod resource {
1980 pub mod v1 {
1981 tonic::include_proto!("temporal.api.cloud.resource.v1");
1982 }
1983 }
1984 pub mod sink {
1985 pub mod v1 {
1986 tonic::include_proto!("temporal.api.cloud.sink.v1");
1987 }
1988 }
1989 pub mod usage {
1990 pub mod v1 {
1991 tonic::include_proto!("temporal.api.cloud.usage.v1");
1992 }
1993 }
1994 }
1995 pub mod common {
1996 pub mod v1 {
1997 use crate::{ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL};
1998 use base64::{Engine, prelude::BASE64_STANDARD};
1999 use std::{
2000 collections::HashMap,
2001 fmt::{Display, Formatter},
2002 };
2003 tonic::include_proto!("temporal.api.common.v1");
2004
2005 impl<T> From<T> for Payload
2006 where
2007 T: AsRef<[u8]>,
2008 {
2009 fn from(v: T) -> Self {
2010 let mut metadata = HashMap::new();
2013 metadata.insert(ENCODING_PAYLOAD_KEY.to_string(), b"binary/plain".to_vec());
2014 Self {
2015 metadata,
2016 data: v.as_ref().to_vec(),
2017 }
2018 }
2019 }
2020
2021 impl Payload {
2022 pub fn as_slice(&self) -> &[u8] {
2024 self.data.as_slice()
2025 }
2026
2027 pub fn is_json_payload(&self) -> bool {
2028 self.metadata
2029 .get(ENCODING_PAYLOAD_KEY)
2030 .map(|v| v.as_slice() == JSON_ENCODING_VAL.as_bytes())
2031 .unwrap_or_default()
2032 }
2033 }
2034
2035 impl std::fmt::Debug for Payload {
2036 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2037 if std::env::var("TEMPORAL_PRINT_FULL_PAYLOADS").is_err()
2038 && self.data.len() > 64
2039 {
2040 let mut windows = self.data.as_slice().windows(32);
2041 write!(
2042 f,
2043 "[{}..{}]",
2044 BASE64_STANDARD.encode(windows.next().unwrap_or_default()),
2045 BASE64_STANDARD.encode(windows.next_back().unwrap_or_default())
2046 )
2047 } else {
2048 write!(f, "[{}]", BASE64_STANDARD.encode(&self.data))
2049 }
2050 }
2051 }
2052
2053 impl Display for Payload {
2054 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2055 write!(f, "{:?}", self)
2056 }
2057 }
2058
2059 impl Display for Header {
2060 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2061 write!(f, "Header(")?;
2062 for kv in &self.fields {
2063 write!(f, "{}: ", kv.0)?;
2064 write!(f, "{}, ", kv.1)?;
2065 }
2066 write!(f, ")")
2067 }
2068 }
2069
2070 impl From<Header> for HashMap<String, Payload> {
2071 fn from(h: Header) -> Self {
2072 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2073 }
2074 }
2075
2076 impl From<Memo> for HashMap<String, Payload> {
2077 fn from(h: Memo) -> Self {
2078 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
2079 }
2080 }
2081
2082 impl From<SearchAttributes> for HashMap<String, Payload> {
2083 fn from(h: SearchAttributes) -> Self {
2084 h.indexed_fields
2085 .into_iter()
2086 .map(|(k, v)| (k, v.into()))
2087 .collect()
2088 }
2089 }
2090
2091 impl From<HashMap<String, Payload>> for SearchAttributes {
2092 fn from(h: HashMap<String, Payload>) -> Self {
2093 Self {
2094 indexed_fields: h.into_iter().map(|(k, v)| (k, v.into())).collect(),
2095 }
2096 }
2097 }
2098
2099 impl From<String> for ActivityType {
2100 fn from(name: String) -> Self {
2101 Self { name }
2102 }
2103 }
2104
2105 impl From<&str> for ActivityType {
2106 fn from(name: &str) -> Self {
2107 Self {
2108 name: name.to_string(),
2109 }
2110 }
2111 }
2112
2113 impl From<ActivityType> for String {
2114 fn from(at: ActivityType) -> Self {
2115 at.name
2116 }
2117 }
2118
2119 impl From<&str> for WorkflowType {
2120 fn from(v: &str) -> Self {
2121 Self {
2122 name: v.to_string(),
2123 }
2124 }
2125 }
2126 }
2127 }
2128 pub mod deployment {
2129 pub mod v1 {
2130 tonic::include_proto!("temporal.api.deployment.v1");
2131 }
2132 }
2133 pub mod enums {
2134 pub mod v1 {
2135 tonic::include_proto!("temporal.api.enums.v1");
2136 }
2137 }
2138 pub mod failure {
2139 pub mod v1 {
2140 tonic::include_proto!("temporal.api.failure.v1");
2141 }
2142 }
2143 pub mod filter {
2144 pub mod v1 {
2145 tonic::include_proto!("temporal.api.filter.v1");
2146 }
2147 }
2148 pub mod history {
2149 pub mod v1 {
2150 use crate::temporal::api::{
2151 enums::v1::EventType, history::v1::history_event::Attributes,
2152 };
2153 use anyhow::bail;
2154 use prost::alloc::fmt::Formatter;
2155 use std::fmt::Display;
2156
2157 tonic::include_proto!("temporal.api.history.v1");
2158
2159 impl History {
2160 pub fn extract_run_id_from_start(&self) -> Result<&str, anyhow::Error> {
2161 extract_original_run_id_from_events(&self.events)
2162 }
2163
2164 pub fn last_event_id(&self) -> i64 {
2167 self.events.last().map(|e| e.event_id).unwrap_or_default()
2168 }
2169 }
2170
2171 pub fn extract_original_run_id_from_events(
2172 events: &[HistoryEvent],
2173 ) -> Result<&str, anyhow::Error> {
2174 if let Some(Attributes::WorkflowExecutionStartedEventAttributes(wes)) =
2175 events.get(0).and_then(|x| x.attributes.as_ref())
2176 {
2177 Ok(&wes.original_execution_run_id)
2178 } else {
2179 bail!("First event is not WorkflowExecutionStarted?!?")
2180 }
2181 }
2182
2183 impl HistoryEvent {
2184 pub fn is_command_event(&self) -> bool {
2186 EventType::try_from(self.event_type).map_or(false, |et| match et {
2187 EventType::ActivityTaskScheduled
2188 | EventType::ActivityTaskCancelRequested
2189 | EventType::MarkerRecorded
2190 | EventType::RequestCancelExternalWorkflowExecutionInitiated
2191 | EventType::SignalExternalWorkflowExecutionInitiated
2192 | EventType::StartChildWorkflowExecutionInitiated
2193 | EventType::TimerCanceled
2194 | EventType::TimerStarted
2195 | EventType::UpsertWorkflowSearchAttributes
2196 | EventType::WorkflowPropertiesModified
2197 | EventType::NexusOperationScheduled
2198 | EventType::NexusOperationCancelRequested
2199 | EventType::WorkflowExecutionCanceled
2200 | EventType::WorkflowExecutionCompleted
2201 | EventType::WorkflowExecutionContinuedAsNew
2202 | EventType::WorkflowExecutionFailed
2203 | EventType::WorkflowExecutionUpdateAccepted
2204 | EventType::WorkflowExecutionUpdateRejected
2205 | EventType::WorkflowExecutionUpdateCompleted => true,
2206 _ => false,
2207 })
2208 }
2209
2210 pub fn get_initial_command_event_id(&self) -> Option<i64> {
2214 self.attributes.as_ref().and_then(|a| {
2215 match a {
2218 Attributes::ActivityTaskStartedEventAttributes(a) =>
2219 Some(a.scheduled_event_id),
2220 Attributes::ActivityTaskCompletedEventAttributes(a) =>
2221 Some(a.scheduled_event_id),
2222 Attributes::ActivityTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2223 Attributes::ActivityTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2224 Attributes::ActivityTaskCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2225 Attributes::ActivityTaskCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2226 Attributes::TimerFiredEventAttributes(a) => Some(a.started_event_id),
2227 Attributes::TimerCanceledEventAttributes(a) => Some(a.started_event_id),
2228 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2229 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(a) => Some(a.initiated_event_id),
2230 Attributes::StartChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2231 Attributes::ChildWorkflowExecutionStartedEventAttributes(a) => Some(a.initiated_event_id),
2232 Attributes::ChildWorkflowExecutionCompletedEventAttributes(a) => Some(a.initiated_event_id),
2233 Attributes::ChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2234 Attributes::ChildWorkflowExecutionCanceledEventAttributes(a) => Some(a.initiated_event_id),
2235 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(a) => Some(a.initiated_event_id),
2236 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(a) => Some(a.initiated_event_id),
2237 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
2238 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(a) => Some(a.initiated_event_id),
2239 Attributes::WorkflowTaskStartedEventAttributes(a) => Some(a.scheduled_event_id),
2240 Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2241 Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2242 Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
2243 Attributes::NexusOperationStartedEventAttributes(a) => Some(a.scheduled_event_id),
2244 Attributes::NexusOperationCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2245 Attributes::NexusOperationFailedEventAttributes(a) => Some(a.scheduled_event_id),
2246 Attributes::NexusOperationTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
2247 Attributes::NexusOperationCanceledEventAttributes(a) => Some(a.scheduled_event_id),
2248 Attributes::NexusOperationCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
2249 Attributes::NexusOperationCancelRequestCompletedEventAttributes(a) => Some(a.scheduled_event_id),
2250 Attributes::NexusOperationCancelRequestFailedEventAttributes(a) => Some(a.scheduled_event_id),
2251 _ => None
2252 }
2253 })
2254 }
2255
2256 pub fn get_protocol_instance_id(&self) -> Option<&str> {
2258 self.attributes.as_ref().and_then(|a| match a {
2259 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(a) => {
2260 Some(a.protocol_instance_id.as_str())
2261 }
2262 _ => None,
2263 })
2264 }
2265
2266 pub fn is_final_wf_execution_event(&self) -> bool {
2268 match self.event_type() {
2269 EventType::WorkflowExecutionCompleted => true,
2270 EventType::WorkflowExecutionCanceled => true,
2271 EventType::WorkflowExecutionFailed => true,
2272 EventType::WorkflowExecutionTimedOut => true,
2273 EventType::WorkflowExecutionContinuedAsNew => true,
2274 EventType::WorkflowExecutionTerminated => true,
2275 _ => false,
2276 }
2277 }
2278
2279 pub fn is_wft_closed_event(&self) -> bool {
2280 match self.event_type() {
2281 EventType::WorkflowTaskCompleted => true,
2282 EventType::WorkflowTaskFailed => true,
2283 EventType::WorkflowTaskTimedOut => true,
2284 _ => false,
2285 }
2286 }
2287
2288 pub fn is_ignorable(&self) -> bool {
2289 if !self.worker_may_ignore {
2290 return false;
2291 }
2292 if let Some(a) = self.attributes.as_ref() {
2295 match a {
2296 Attributes::WorkflowExecutionStartedEventAttributes(_) => false,
2297 Attributes::WorkflowExecutionCompletedEventAttributes(_) => false,
2298 Attributes::WorkflowExecutionFailedEventAttributes(_) => false,
2299 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => false,
2300 Attributes::WorkflowTaskScheduledEventAttributes(_) => false,
2301 Attributes::WorkflowTaskStartedEventAttributes(_) => false,
2302 Attributes::WorkflowTaskCompletedEventAttributes(_) => false,
2303 Attributes::WorkflowTaskTimedOutEventAttributes(_) => false,
2304 Attributes::WorkflowTaskFailedEventAttributes(_) => false,
2305 Attributes::ActivityTaskScheduledEventAttributes(_) => false,
2306 Attributes::ActivityTaskStartedEventAttributes(_) => false,
2307 Attributes::ActivityTaskCompletedEventAttributes(_) => false,
2308 Attributes::ActivityTaskFailedEventAttributes(_) => false,
2309 Attributes::ActivityTaskTimedOutEventAttributes(_) => false,
2310 Attributes::TimerStartedEventAttributes(_) => false,
2311 Attributes::TimerFiredEventAttributes(_) => false,
2312 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => false,
2313 Attributes::ActivityTaskCanceledEventAttributes(_) => false,
2314 Attributes::TimerCanceledEventAttributes(_) => false,
2315 Attributes::MarkerRecordedEventAttributes(_) => false,
2316 Attributes::WorkflowExecutionSignaledEventAttributes(_) => false,
2317 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => false,
2318 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => false,
2319 Attributes::WorkflowExecutionCanceledEventAttributes(_) => false,
2320 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2321 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => false,
2322 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => false,
2323 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => false,
2324 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => false,
2325 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => false,
2326 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => false,
2327 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => false,
2328 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => false,
2329 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => false,
2330 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => false,
2331 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => false,
2332 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => false,
2333 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => false,
2334 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => false,
2335 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => false,
2336 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => false,
2337 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => false,
2338 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => false,
2339 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => false,
2340 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => false,
2341 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => false,
2342 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => false,
2343 Attributes::NexusOperationScheduledEventAttributes(_) => false,
2344 Attributes::NexusOperationStartedEventAttributes(_) => false,
2345 Attributes::NexusOperationCompletedEventAttributes(_) => false,
2346 Attributes::NexusOperationFailedEventAttributes(_) => false,
2347 Attributes::NexusOperationCanceledEventAttributes(_) => false,
2348 Attributes::NexusOperationTimedOutEventAttributes(_) => false,
2349 Attributes::NexusOperationCancelRequestedEventAttributes(_) => false,
2350 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => true,
2352 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => false,
2353 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => false,
2354 }
2355 } else {
2356 false
2357 }
2358 }
2359 }
2360
2361 impl Display for HistoryEvent {
2362 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2363 write!(
2364 f,
2365 "HistoryEvent(id: {}, {:?})",
2366 self.event_id,
2367 EventType::try_from(self.event_type).unwrap_or_default()
2368 )
2369 }
2370 }
2371
2372 impl Attributes {
2373 pub fn event_type(&self) -> EventType {
2374 match self {
2376 Attributes::WorkflowExecutionStartedEventAttributes(_) => { EventType::WorkflowExecutionStarted }
2377 Attributes::WorkflowExecutionCompletedEventAttributes(_) => { EventType::WorkflowExecutionCompleted }
2378 Attributes::WorkflowExecutionFailedEventAttributes(_) => { EventType::WorkflowExecutionFailed }
2379 Attributes::WorkflowExecutionTimedOutEventAttributes(_) => { EventType::WorkflowExecutionTimedOut }
2380 Attributes::WorkflowTaskScheduledEventAttributes(_) => { EventType::WorkflowTaskScheduled }
2381 Attributes::WorkflowTaskStartedEventAttributes(_) => { EventType::WorkflowTaskStarted }
2382 Attributes::WorkflowTaskCompletedEventAttributes(_) => { EventType::WorkflowTaskCompleted }
2383 Attributes::WorkflowTaskTimedOutEventAttributes(_) => { EventType::WorkflowTaskTimedOut }
2384 Attributes::WorkflowTaskFailedEventAttributes(_) => { EventType::WorkflowTaskFailed }
2385 Attributes::ActivityTaskScheduledEventAttributes(_) => { EventType::ActivityTaskScheduled }
2386 Attributes::ActivityTaskStartedEventAttributes(_) => { EventType::ActivityTaskStarted }
2387 Attributes::ActivityTaskCompletedEventAttributes(_) => { EventType::ActivityTaskCompleted }
2388 Attributes::ActivityTaskFailedEventAttributes(_) => { EventType::ActivityTaskFailed }
2389 Attributes::ActivityTaskTimedOutEventAttributes(_) => { EventType::ActivityTaskTimedOut }
2390 Attributes::TimerStartedEventAttributes(_) => { EventType::TimerStarted }
2391 Attributes::TimerFiredEventAttributes(_) => { EventType::TimerFired }
2392 Attributes::ActivityTaskCancelRequestedEventAttributes(_) => { EventType::ActivityTaskCancelRequested }
2393 Attributes::ActivityTaskCanceledEventAttributes(_) => { EventType::ActivityTaskCanceled }
2394 Attributes::TimerCanceledEventAttributes(_) => { EventType::TimerCanceled }
2395 Attributes::MarkerRecordedEventAttributes(_) => { EventType::MarkerRecorded }
2396 Attributes::WorkflowExecutionSignaledEventAttributes(_) => { EventType::WorkflowExecutionSignaled }
2397 Attributes::WorkflowExecutionTerminatedEventAttributes(_) => { EventType::WorkflowExecutionTerminated }
2398 Attributes::WorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::WorkflowExecutionCancelRequested }
2399 Attributes::WorkflowExecutionCanceledEventAttributes(_) => { EventType::WorkflowExecutionCanceled }
2400 Attributes::RequestCancelExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionInitiated }
2401 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::RequestCancelExternalWorkflowExecutionFailed }
2402 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(_) => { EventType::ExternalWorkflowExecutionCancelRequested }
2403 Attributes::WorkflowExecutionContinuedAsNewEventAttributes(_) => { EventType::WorkflowExecutionContinuedAsNew }
2404 Attributes::StartChildWorkflowExecutionInitiatedEventAttributes(_) => { EventType::StartChildWorkflowExecutionInitiated }
2405 Attributes::StartChildWorkflowExecutionFailedEventAttributes(_) => { EventType::StartChildWorkflowExecutionFailed }
2406 Attributes::ChildWorkflowExecutionStartedEventAttributes(_) => { EventType::ChildWorkflowExecutionStarted }
2407 Attributes::ChildWorkflowExecutionCompletedEventAttributes(_) => { EventType::ChildWorkflowExecutionCompleted }
2408 Attributes::ChildWorkflowExecutionFailedEventAttributes(_) => { EventType::ChildWorkflowExecutionFailed }
2409 Attributes::ChildWorkflowExecutionCanceledEventAttributes(_) => { EventType::ChildWorkflowExecutionCanceled }
2410 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(_) => { EventType::ChildWorkflowExecutionTimedOut }
2411 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(_) => { EventType::ChildWorkflowExecutionTerminated }
2412 Attributes::SignalExternalWorkflowExecutionInitiatedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionInitiated }
2413 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(_) => { EventType::SignalExternalWorkflowExecutionFailed }
2414 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(_) => { EventType::ExternalWorkflowExecutionSignaled }
2415 Attributes::UpsertWorkflowSearchAttributesEventAttributes(_) => { EventType::UpsertWorkflowSearchAttributes }
2416 Attributes::WorkflowExecutionUpdateAdmittedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAdmitted }
2417 Attributes::WorkflowExecutionUpdateRejectedEventAttributes(_) => { EventType::WorkflowExecutionUpdateRejected }
2418 Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(_) => { EventType::WorkflowExecutionUpdateAccepted }
2419 Attributes::WorkflowExecutionUpdateCompletedEventAttributes(_) => { EventType::WorkflowExecutionUpdateCompleted }
2420 Attributes::WorkflowPropertiesModifiedExternallyEventAttributes(_) => { EventType::WorkflowPropertiesModifiedExternally }
2421 Attributes::ActivityPropertiesModifiedExternallyEventAttributes(_) => { EventType::ActivityPropertiesModifiedExternally }
2422 Attributes::WorkflowPropertiesModifiedEventAttributes(_) => { EventType::WorkflowPropertiesModified }
2423 Attributes::NexusOperationScheduledEventAttributes(_) => { EventType::NexusOperationScheduled }
2424 Attributes::NexusOperationStartedEventAttributes(_) => { EventType::NexusOperationStarted }
2425 Attributes::NexusOperationCompletedEventAttributes(_) => { EventType::NexusOperationCompleted }
2426 Attributes::NexusOperationFailedEventAttributes(_) => { EventType::NexusOperationFailed }
2427 Attributes::NexusOperationCanceledEventAttributes(_) => { EventType::NexusOperationCanceled }
2428 Attributes::NexusOperationTimedOutEventAttributes(_) => { EventType::NexusOperationTimedOut }
2429 Attributes::NexusOperationCancelRequestedEventAttributes(_) => { EventType::NexusOperationCancelRequested }
2430 Attributes::WorkflowExecutionOptionsUpdatedEventAttributes(_) => { EventType::WorkflowExecutionOptionsUpdated }
2431 Attributes::NexusOperationCancelRequestCompletedEventAttributes(_) => { EventType::NexusOperationCancelRequestCompleted }
2432 Attributes::NexusOperationCancelRequestFailedEventAttributes(_) => { EventType::NexusOperationCancelRequestFailed }
2433 }
2434 }
2435 }
2436 }
2437 }
2438 pub mod namespace {
2439 pub mod v1 {
2440 tonic::include_proto!("temporal.api.namespace.v1");
2441 }
2442 }
2443 pub mod operatorservice {
2444 pub mod v1 {
2445 tonic::include_proto!("temporal.api.operatorservice.v1");
2446 }
2447 }
2448 pub mod protocol {
2449 pub mod v1 {
2450 use std::fmt::{Display, Formatter};
2451 tonic::include_proto!("temporal.api.protocol.v1");
2452
2453 impl Display for Message {
2454 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2455 write!(f, "ProtocolMessage({})", self.id)
2456 }
2457 }
2458 }
2459 }
2460 pub mod query {
2461 pub mod v1 {
2462 tonic::include_proto!("temporal.api.query.v1");
2463 }
2464 }
2465 pub mod replication {
2466 pub mod v1 {
2467 tonic::include_proto!("temporal.api.replication.v1");
2468 }
2469 }
2470 pub mod rules {
2471 pub mod v1 {
2472 tonic::include_proto!("temporal.api.rules.v1");
2473 }
2474 }
2475 pub mod schedule {
2476 #[allow(rustdoc::invalid_html_tags)]
2477 pub mod v1 {
2478 tonic::include_proto!("temporal.api.schedule.v1");
2479 }
2480 }
2481 pub mod sdk {
2482 pub mod v1 {
2483 tonic::include_proto!("temporal.api.sdk.v1");
2484 }
2485 }
2486 pub mod taskqueue {
2487 pub mod v1 {
2488 use crate::temporal::api::enums::v1::TaskQueueKind;
2489 tonic::include_proto!("temporal.api.taskqueue.v1");
2490
2491 impl From<String> for TaskQueue {
2492 fn from(name: String) -> Self {
2493 Self {
2494 name,
2495 kind: TaskQueueKind::Normal as i32,
2496 normal_name: "".to_string(),
2497 }
2498 }
2499 }
2500 }
2501 }
2502 pub mod testservice {
2503 pub mod v1 {
2504 tonic::include_proto!("temporal.api.testservice.v1");
2505 }
2506 }
2507 pub mod update {
2508 pub mod v1 {
2509 use crate::temporal::api::update::v1::outcome::Value;
2510 tonic::include_proto!("temporal.api.update.v1");
2511
2512 impl Outcome {
2513 pub fn is_success(&self) -> bool {
2514 match self.value {
2515 Some(Value::Success(_)) => true,
2516 _ => false,
2517 }
2518 }
2519 }
2520 }
2521 }
2522 pub mod version {
2523 pub mod v1 {
2524 tonic::include_proto!("temporal.api.version.v1");
2525 }
2526 }
2527 pub mod worker {
2528 pub mod v1 {
2529 tonic::include_proto!("temporal.api.worker.v1");
2530 }
2531 }
2532 pub mod workflow {
2533 pub mod v1 {
2534 tonic::include_proto!("temporal.api.workflow.v1");
2535 }
2536 }
2537 pub mod nexus {
2538 pub mod v1 {
2539 use crate::{
2540 camel_case_to_screaming_snake,
2541 temporal::api::{
2542 common,
2543 common::v1::link::{WorkflowEvent, workflow_event},
2544 enums::v1::EventType,
2545 },
2546 };
2547 use anyhow::{anyhow, bail};
2548 use std::fmt::{Display, Formatter};
2549 use tonic::transport::Uri;
2550
2551 tonic::include_proto!("temporal.api.nexus.v1");
2552
2553 impl Display for Response {
2554 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2555 write!(f, "NexusResponse(",)?;
2556 match &self.variant {
2557 None => {}
2558 Some(v) => {
2559 write!(f, "{v}")?;
2560 }
2561 }
2562 write!(f, ")")
2563 }
2564 }
2565
2566 impl Display for response::Variant {
2567 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2568 match self {
2569 response::Variant::StartOperation(_) => {
2570 write!(f, "StartOperation")
2571 }
2572 response::Variant::CancelOperation(_) => {
2573 write!(f, "CancelOperation")
2574 }
2575 }
2576 }
2577 }
2578
2579 impl Display for HandlerError {
2580 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2581 write!(f, "HandlerError")
2582 }
2583 }
2584
2585 static SCHEME_PREFIX: &str = "temporal://";
2586
2587 pub fn workflow_event_link_from_nexus(
2589 l: &Link,
2590 ) -> Result<common::v1::Link, anyhow::Error> {
2591 if !l.url.starts_with(SCHEME_PREFIX) {
2592 bail!("Invalid scheme for nexus link: {:?}", l.url);
2593 }
2594 let no_authority_url = l.url.strip_prefix(SCHEME_PREFIX).unwrap();
2597 let uri = Uri::try_from(no_authority_url)?;
2598 let parts = uri.into_parts();
2599 let path = parts.path_and_query.ok_or_else(|| {
2600 anyhow!("Failed to parse nexus link, invalid path: {:?}", l)
2601 })?;
2602 let path_parts = path.path().split('/').collect::<Vec<_>>();
2603 if path_parts.get(1) != Some(&"namespaces") {
2604 bail!("Invalid path for nexus link: {:?}", l);
2605 }
2606 let namespace = path_parts.get(2).ok_or_else(|| {
2607 anyhow!("Failed to parse nexus link, no namespace: {:?}", l)
2608 })?;
2609 if path_parts.get(3) != Some(&"workflows") {
2610 bail!("Invalid path for nexus link, no workflows segment: {:?}", l);
2611 }
2612 let workflow_id = path_parts.get(4).ok_or_else(|| {
2613 anyhow!("Failed to parse nexus link, no workflow id: {:?}", l)
2614 })?;
2615 let run_id = path_parts
2616 .get(5)
2617 .ok_or_else(|| anyhow!("Failed to parse nexus link, no run id: {:?}", l))?;
2618 if path_parts.get(6) != Some(&"history") {
2619 bail!("Invalid path for nexus link, no history segment: {:?}", l);
2620 }
2621 let reference = if let Some(query) = path.query() {
2622 let mut eventref = workflow_event::EventReference::default();
2623 let query_parts = query.split('&').collect::<Vec<_>>();
2624 for qp in query_parts {
2625 let mut kv = qp.split('=');
2626 let key = kv.next().ok_or_else(|| {
2627 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2628 })?;
2629 let val = kv.next().ok_or_else(|| {
2630 anyhow!("Failed to parse nexus link query parameter: {:?}", l)
2631 })?;
2632 match key {
2633 "eventID" => {
2634 eventref.event_id = val.parse().map_err(|_| {
2635 anyhow!("Failed to parse nexus link event id: {:?}", l)
2636 })?;
2637 }
2638 "eventType" => {
2639 eventref.event_type = EventType::from_str_name(val)
2640 .unwrap_or_else(|| {
2641 EventType::from_str_name(
2642 &("EVENT_TYPE_".to_string()
2643 + &camel_case_to_screaming_snake(val)),
2644 )
2645 .unwrap_or_default()
2646 })
2647 .into()
2648 }
2649 _ => continue,
2650 }
2651 }
2652 Some(workflow_event::Reference::EventRef(eventref))
2653 } else {
2654 None
2655 };
2656
2657 Ok(common::v1::Link {
2658 variant: Some(common::v1::link::Variant::WorkflowEvent(WorkflowEvent {
2659 namespace: namespace.to_string(),
2660 workflow_id: workflow_id.to_string(),
2661 run_id: run_id.to_string(),
2662 reference,
2663 })),
2664 })
2665 }
2666 }
2667 }
2668 pub mod workflowservice {
2669 pub mod v1 {
2670 use std::{
2671 convert::TryInto,
2672 fmt::{Display, Formatter},
2673 time::{Duration, SystemTime},
2674 };
2675
2676 tonic::include_proto!("temporal.api.workflowservice.v1");
2677
2678 macro_rules! sched_to_start_impl {
2679 ($sched_field:ident) => {
2680 pub fn sched_to_start(&self) -> Option<Duration> {
2683 if let Some((sch, st)) =
2684 self.$sched_field.clone().zip(self.started_time.clone())
2685 {
2686 if let Some(value) = elapsed_between_prost_times(sch, st) {
2687 return value;
2688 }
2689 }
2690 None
2691 }
2692 };
2693 }
2694
2695 fn elapsed_between_prost_times(
2696 from: prost_wkt_types::Timestamp,
2697 to: prost_wkt_types::Timestamp,
2698 ) -> Option<Option<Duration>> {
2699 let from: Result<SystemTime, _> = from.try_into();
2700 let to: Result<SystemTime, _> = to.try_into();
2701 if let (Ok(from), Ok(to)) = (from, to) {
2702 return Some(to.duration_since(from).ok());
2703 }
2704 None
2705 }
2706
2707 impl PollWorkflowTaskQueueResponse {
2708 sched_to_start_impl!(scheduled_time);
2709 }
2710
2711 impl Display for PollWorkflowTaskQueueResponse {
2712 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2713 let last_event = self
2714 .history
2715 .as_ref()
2716 .and_then(|h| h.events.last().map(|he| he.event_id))
2717 .unwrap_or(0);
2718 write!(
2719 f,
2720 "PollWFTQResp(run_id: {}, attempt: {}, last_event: {})",
2721 self.workflow_execution
2722 .as_ref()
2723 .map_or("", |we| we.run_id.as_str()),
2724 self.attempt,
2725 last_event
2726 )
2727 }
2728 }
2729
2730 pub struct CompactHist<'a>(pub &'a PollWorkflowTaskQueueResponse);
2732 impl Display for CompactHist<'_> {
2733 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2734 writeln!(
2735 f,
2736 "PollWorkflowTaskQueueResponse (prev_started: {}, started: {})",
2737 self.0.previous_started_event_id, self.0.started_event_id
2738 )?;
2739 if let Some(h) = self.0.history.as_ref() {
2740 for event in &h.events {
2741 writeln!(f, "{}", event)?;
2742 }
2743 }
2744 writeln!(f, "query: {:#?}", self.0.query)?;
2745 writeln!(f, "queries: {:#?}", self.0.queries)
2746 }
2747 }
2748
2749 impl PollActivityTaskQueueResponse {
2750 sched_to_start_impl!(current_attempt_scheduled_time);
2751 }
2752
2753 impl PollNexusTaskQueueResponse {
2754 pub fn sched_to_start(&self) -> Option<Duration> {
2755 if let Some((sch, st)) = self
2756 .request
2757 .as_ref()
2758 .and_then(|r| r.scheduled_time)
2759 .clone()
2760 .zip(SystemTime::now().try_into().ok())
2761 {
2762 if let Some(value) = elapsed_between_prost_times(sch, st) {
2763 return value;
2764 }
2765 }
2766 None
2767 }
2768 }
2769
2770 impl QueryWorkflowResponse {
2771 pub fn unwrap(self) -> Vec<crate::temporal::api::common::v1::Payload> {
2773 self.query_result.unwrap().payloads
2774 }
2775 }
2776 }
2777 }
2778 }
2779}
2780
2781#[allow(
2782 clippy::all,
2783 missing_docs,
2784 rustdoc::broken_intra_doc_links,
2785 rustdoc::bare_urls
2786)]
2787pub mod grpc {
2788 pub mod health {
2789 pub mod v1 {
2790 tonic::include_proto!("grpc.health.v1");
2791 }
2792 }
2793}
2794
2795pub fn camel_case_to_screaming_snake(val: &str) -> String {
2797 let mut out = String::new();
2798 let mut last_was_upper = true;
2799 for c in val.chars() {
2800 if c.is_uppercase() {
2801 if !last_was_upper {
2802 out.push('_');
2803 }
2804 out.push(c.to_ascii_uppercase());
2805 last_was_upper = true;
2806 } else {
2807 out.push(c.to_ascii_uppercase());
2808 last_was_upper = false;
2809 }
2810 }
2811 out
2812}
2813
2814#[cfg(test)]
2815mod tests {
2816 use crate::temporal::api::failure::v1::Failure;
2817 use anyhow::anyhow;
2818
2819 #[test]
2820 fn anyhow_to_failure_conversion() {
2821 let no_causes: Failure = anyhow!("no causes").into();
2822 assert_eq!(no_causes.cause, None);
2823 assert_eq!(no_causes.message, "no causes");
2824 let orig = anyhow!("fail 1");
2825 let mid = orig.context("fail 2");
2826 let top = mid.context("fail 3");
2827 let as_fail: Failure = top.into();
2828 assert_eq!(as_fail.message, "fail 3");
2829 assert_eq!(as_fail.cause.as_ref().unwrap().message, "fail 2");
2830 assert_eq!(as_fail.cause.unwrap().cause.unwrap().message, "fail 1");
2831 }
2832}