1#[allow(clippy::large_enum_variant)]
6#[allow(missing_docs)]
8pub mod coresdk {
9 tonic::include_proto!("coresdk");
12 #[allow(clippy::module_inception)]
13 pub mod activity_task {
14 tonic::include_proto!("coresdk.activity_task");
15 }
16 #[allow(clippy::module_inception)]
17 pub mod activity_result {
18 tonic::include_proto!("coresdk.activity_result");
19 }
20 pub mod common {
21 tonic::include_proto!("coresdk.common");
22
23 impl From<Vec<u8>> for Payload {
24 fn from(data: Vec<u8>) -> Self {
25 Self {
26 metadata: Default::default(),
27 data,
28 }
29 }
30 }
31 }
32 pub mod workflow_activation {
33 tonic::include_proto!("coresdk.workflow_activation");
34 pub fn create_evict_activation(task_token: Vec<u8>, run_id: String) -> WfActivation {
35 WfActivation {
36 task_token,
37 timestamp: None,
38 run_id,
39 jobs: vec![WfActivationJob::from(
40 wf_activation_job::Variant::RemoveFromCache(true),
41 )],
42 }
43 }
44 }
45 pub mod workflow_completion {
46 use crate::protos::coresdk::workflow_completion::wf_activation_completion::Status;
47 tonic::include_proto!("coresdk.workflow_completion");
48
49 impl wf_activation_completion::Status {
50 pub fn is_success(&self) -> bool {
51 match &self {
52 Status::Successful(_) => true,
53 Status::Failed(_) => false,
54 }
55 }
56 }
57 }
58 pub mod workflow_commands {
59 tonic::include_proto!("coresdk.workflow_commands");
60 }
61
62 use crate::protos::{
63 coresdk::{
64 activity_result::ActivityResult,
65 activity_task::ActivityTask,
66 common::{Payload, UserCodeFailure},
67 workflow_activation::SignalWorkflow,
68 workflow_commands::workflow_command::Variant,
69 workflow_completion::Success,
70 },
71 temporal::api::{
72 common::v1::{Payloads, WorkflowExecution},
73 failure::v1::ApplicationFailureInfo,
74 failure::v1::{failure::FailureInfo, Failure},
75 history::v1::WorkflowExecutionSignaledEventAttributes,
76 workflowservice::v1::PollActivityTaskQueueResponse,
77 },
78 };
79 use std::convert::TryFrom;
80 use workflow_activation::{wf_activation_job, WfActivationJob};
81 use workflow_commands::{workflow_command, WorkflowCommand};
82 use workflow_completion::{wf_activation_completion, WfActivationCompletion};
83
84 pub type HistoryEventId = i64;
85
86 impl From<wf_activation_job::Variant> for WfActivationJob {
87 fn from(a: wf_activation_job::Variant) -> Self {
88 WfActivationJob { variant: Some(a) }
89 }
90 }
91
92 impl From<Vec<WorkflowCommand>> for workflow_completion::Success {
93 fn from(v: Vec<WorkflowCommand>) -> Self {
94 Self { commands: v }
95 }
96 }
97
98 impl Success {
99 pub fn from_cmds(cmds: Vec<Variant>) -> Self {
100 let cmds: Vec<_> = cmds
101 .into_iter()
102 .map(|c| WorkflowCommand { variant: Some(c) })
103 .collect();
104 cmds.into()
105 }
106 }
107
108 impl WfActivationCompletion {
109 pub fn ok_from_cmds(cmds: Vec<workflow_command::Variant>, task_token: Vec<u8>) -> Self {
110 let success = Success::from_cmds(cmds);
111 Self {
112 task_token,
113 status: Some(wf_activation_completion::Status::Successful(success)),
114 }
115 }
116
117 pub fn fail(task_token: Vec<u8>, failure: UserCodeFailure) -> Self {
118 Self {
119 task_token,
120 status: Some(wf_activation_completion::Status::Failed(
121 workflow_completion::Failure {
122 failure: Some(failure),
123 },
124 )),
125 }
126 }
127
128 pub fn from_status(task_token: Vec<u8>, status: wf_activation_completion::Status) -> Self {
129 Self {
130 task_token,
131 status: Some(status),
132 }
133 }
134 }
135
136 impl ActivityResult {
137 pub fn ok(result: Payload) -> Self {
138 Self {
139 status: Some(activity_result::activity_result::Status::Completed(
140 activity_result::Success {
141 result: Some(result),
142 },
143 )),
144 }
145 }
146 }
147
148 impl ActivityTask {
149 pub fn start_from_poll_resp(r: PollActivityTaskQueueResponse, task_token: Vec<u8>) -> Self {
150 ActivityTask {
151 task_token,
152 activity_id: r.activity_id,
153 variant: Some(activity_task::activity_task::Variant::Start(
154 activity_task::Start {
155 workflow_namespace: r.workflow_namespace,
156 workflow_type: r
157 .workflow_type
158 .map(|wt| wt.name)
159 .unwrap_or_else(|| "".to_string()),
160 workflow_execution: r.workflow_execution.map(Into::into),
161 activity_type: r
162 .activity_type
163 .map(|at| at.name)
164 .unwrap_or_else(|| "".to_string()),
165 header_fields: r.header.map(Into::into).unwrap_or_default(),
166 input: Vec::from_payloads(r.input),
167 heartbeat_details: Vec::from_payloads(r.heartbeat_details),
168 scheduled_time: r.scheduled_time,
169 current_attempt_scheduled_time: r.current_attempt_scheduled_time,
170 started_time: r.started_time,
171 attempt: r.attempt,
172 schedule_to_close_timeout: r.schedule_to_close_timeout,
173 start_to_close_timeout: r.start_to_close_timeout,
174 heartbeat_timeout: r.heartbeat_timeout,
175 retry_policy: r.retry_policy.map(Into::into),
176 },
177 )),
178 }
179 }
180 }
181
182 impl From<UserCodeFailure> for Failure {
183 fn from(f: UserCodeFailure) -> Self {
184 Self {
185 message: f.message,
186 source: f.source,
187 stack_trace: f.stack_trace,
188 cause: f.cause.map(|b| Box::new((*b).into())),
189 failure_info: Some(FailureInfo::ApplicationFailureInfo(
190 ApplicationFailureInfo {
191 r#type: f.r#type,
192 non_retryable: f.non_retryable,
193 details: None,
194 },
195 )),
196 }
197 }
198 }
199
200 impl From<Failure> for UserCodeFailure {
201 fn from(f: Failure) -> Self {
202 let mut r#type = "".to_string();
203 let mut non_retryable = false;
204 if let Some(FailureInfo::ApplicationFailureInfo(fi)) = f.failure_info {
205 r#type = fi.r#type;
206 non_retryable = fi.non_retryable;
207 }
208 Self {
209 message: f.message,
210 r#type,
211 source: f.source,
212 stack_trace: f.stack_trace,
213 non_retryable,
214 cause: f.cause.map(|b| Box::new((*b).into())),
215 }
216 }
217 }
218
219 impl From<common::Payload> for super::temporal::api::common::v1::Payload {
220 fn from(p: Payload) -> Self {
221 Self {
222 metadata: p.metadata,
223 data: p.data,
224 }
225 }
226 }
227
228 impl From<super::temporal::api::common::v1::Payload> for common::Payload {
229 fn from(p: super::temporal::api::common::v1::Payload) -> Self {
230 Self {
231 metadata: p.metadata,
232 data: p.data,
233 }
234 }
235 }
236
237 pub trait PayloadsExt {
238 fn into_payloads(self) -> Option<Payloads>;
239 fn from_payloads(p: Option<Payloads>) -> Self;
240 }
241
242 impl PayloadsExt for Vec<common::Payload> {
243 fn into_payloads(self) -> Option<Payloads> {
244 if self.is_empty() {
245 None
246 } else {
247 Some(Payloads {
248 payloads: self.into_iter().map(Into::into).collect(),
249 })
250 }
251 }
252
253 fn from_payloads(p: Option<Payloads>) -> Self {
254 match p {
255 None => vec![],
256 Some(p) => p.payloads.into_iter().map(Into::into).collect(),
257 }
258 }
259 }
260
261 impl From<common::Payload> for Payloads {
262 fn from(p: Payload) -> Self {
263 Payloads {
264 payloads: vec![p.into()],
265 }
266 }
267 }
268
269 #[derive(derive_more::Display, Debug)]
271 pub enum PayloadsToPayloadError {
272 MoreThanOnePayload,
273 NoPayload,
274 }
275 impl TryFrom<Payloads> for common::Payload {
276 type Error = PayloadsToPayloadError;
277
278 fn try_from(mut v: Payloads) -> Result<Self, Self::Error> {
279 match v.payloads.pop() {
280 None => Err(PayloadsToPayloadError::NoPayload),
281 Some(p) => {
282 if !v.payloads.is_empty() {
283 Err(PayloadsToPayloadError::MoreThanOnePayload)
284 } else {
285 Ok(p.into())
286 }
287 }
288 }
289 }
290 }
291
292 impl From<WorkflowExecutionSignaledEventAttributes> for SignalWorkflow {
293 fn from(a: WorkflowExecutionSignaledEventAttributes) -> Self {
294 Self {
295 signal_name: a.signal_name,
296 input: Vec::from_payloads(a.input),
297 identity: a.identity,
298 }
299 }
300 }
301
302 impl From<WorkflowExecution> for common::WorkflowExecution {
303 fn from(w: WorkflowExecution) -> Self {
304 Self {
305 workflow_id: w.workflow_id,
306 run_id: w.run_id,
307 }
308 }
309 }
310}
311
312#[allow(clippy::all)]
314#[allow(missing_docs)]
315pub mod temporal {
317 pub mod api {
318 pub mod command {
319 pub mod v1 {
320 tonic::include_proto!("temporal.api.command.v1");
321
322 use crate::protos::{
323 coresdk::{workflow_commands, PayloadsExt},
324 temporal::api::common::v1::ActivityType,
325 temporal::api::enums::v1::CommandType,
326 };
327 use command::Attributes;
328 use std::fmt::{Display, Formatter};
329
330 impl From<command::Attributes> for Command {
331 fn from(c: command::Attributes) -> Self {
332 match c {
333 a @ Attributes::StartTimerCommandAttributes(_) => Self {
334 command_type: CommandType::StartTimer as i32,
335 attributes: Some(a),
336 },
337 a @ Attributes::CancelTimerCommandAttributes(_) => Self {
338 command_type: CommandType::CancelTimer as i32,
339 attributes: Some(a),
340 },
341 a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => Self {
342 command_type: CommandType::CompleteWorkflowExecution as i32,
343 attributes: Some(a),
344 },
345 a @ Attributes::FailWorkflowExecutionCommandAttributes(_) => Self {
346 command_type: CommandType::FailWorkflowExecution as i32,
347 attributes: Some(a),
348 },
349 a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self {
350 command_type: CommandType::ScheduleActivityTask as i32,
351 attributes: Some(a),
352 },
353 a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => Self {
354 command_type: CommandType::RequestCancelActivityTask as i32,
355 attributes: Some(a),
356 },
357 _ => unimplemented!(),
358 }
359 }
360 }
361
362 impl Display for Command {
363 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
364 let ct = CommandType::from_i32(self.command_type)
365 .unwrap_or(CommandType::Unspecified);
366 write!(f, "{:?}", ct)
367 }
368 }
369
370 impl From<workflow_commands::StartTimer> for command::Attributes {
371 fn from(s: workflow_commands::StartTimer) -> Self {
372 Self::StartTimerCommandAttributes(StartTimerCommandAttributes {
373 timer_id: s.timer_id,
374 start_to_fire_timeout: s.start_to_fire_timeout,
375 })
376 }
377 }
378
379 impl From<workflow_commands::CancelTimer> for command::Attributes {
380 fn from(s: workflow_commands::CancelTimer) -> Self {
381 Self::CancelTimerCommandAttributes(CancelTimerCommandAttributes {
382 timer_id: s.timer_id,
383 })
384 }
385 }
386
387 impl From<workflow_commands::RequestCancelActivity> for command::Attributes {
388 fn from(c: workflow_commands::RequestCancelActivity) -> Self {
389 Self::RequestCancelActivityTaskCommandAttributes(
390 RequestCancelActivityTaskCommandAttributes {
391 scheduled_event_id: c.scheduled_event_id,
392 },
393 )
394 }
395 }
396
397 impl From<workflow_commands::ScheduleActivity> for command::Attributes {
398 fn from(s: workflow_commands::ScheduleActivity) -> Self {
399 Self::ScheduleActivityTaskCommandAttributes(
400 ScheduleActivityTaskCommandAttributes {
401 activity_id: s.activity_id,
402 activity_type: Some(ActivityType {
403 name: s.activity_type,
404 }),
405 namespace: s.namespace,
406 task_queue: Some(s.task_queue.into()),
407 header: Some(s.header_fields.into()),
408 input: s.arguments.into_payloads(),
409 schedule_to_close_timeout: s.schedule_to_close_timeout,
410 schedule_to_start_timeout: s.schedule_to_start_timeout,
411 start_to_close_timeout: s.start_to_close_timeout,
412 heartbeat_timeout: s.heartbeat_timeout,
413 retry_policy: s.retry_policy.map(Into::into),
414 },
415 )
416 }
417 }
418
419 impl From<workflow_commands::CompleteWorkflowExecution> for command::Attributes {
420 fn from(c: workflow_commands::CompleteWorkflowExecution) -> Self {
421 Self::CompleteWorkflowExecutionCommandAttributes(
422 CompleteWorkflowExecutionCommandAttributes {
423 result: c.result.map(Into::into),
424 },
425 )
426 }
427 }
428
429 impl From<workflow_commands::FailWorkflowExecution> for command::Attributes {
430 fn from(c: workflow_commands::FailWorkflowExecution) -> Self {
431 Self::FailWorkflowExecutionCommandAttributes(
432 FailWorkflowExecutionCommandAttributes {
433 failure: c.failure.map(Into::into),
434 },
435 )
436 }
437 }
438 }
439 }
440 pub mod enums {
441 pub mod v1 {
442 tonic::include_proto!("temporal.api.enums.v1");
443 }
444 }
445 pub mod failure {
446 pub mod v1 {
447 tonic::include_proto!("temporal.api.failure.v1");
448 }
449 }
450 pub mod filter {
451 pub mod v1 {
452 tonic::include_proto!("temporal.api.filter.v1");
453 }
454 }
455 pub mod common {
456 pub mod v1 {
457 use crate::protos::coresdk::common;
458 use std::collections::HashMap;
459 tonic::include_proto!("temporal.api.common.v1");
460
461 impl From<HashMap<String, common::Payload>> for Header {
462 fn from(h: HashMap<String, common::Payload>) -> Self {
463 Self {
464 fields: h.into_iter().map(|(k, v)| (k, v.into())).collect(),
465 }
466 }
467 }
468
469 impl From<Header> for HashMap<String, common::Payload> {
470 fn from(h: Header) -> Self {
471 h.fields.into_iter().map(|(k, v)| (k, v.into())).collect()
472 }
473 }
474
475 impl From<common::RetryPolicy> for RetryPolicy {
476 fn from(r: common::RetryPolicy) -> Self {
477 Self {
478 initial_interval: r.initial_interval,
479 backoff_coefficient: r.backoff_coefficient,
480 maximum_interval: r.maximum_interval,
481 maximum_attempts: r.maximum_attempts,
482 non_retryable_error_types: r.non_retryable_error_types,
483 }
484 }
485 }
486
487 impl From<RetryPolicy> for common::RetryPolicy {
488 fn from(r: RetryPolicy) -> Self {
489 Self {
490 initial_interval: r.initial_interval,
491 backoff_coefficient: r.backoff_coefficient,
492 maximum_interval: r.maximum_interval,
493 maximum_attempts: r.maximum_attempts,
494 non_retryable_error_types: r.non_retryable_error_types,
495 }
496 }
497 }
498 }
499 }
500 pub mod history {
501 pub mod v1 {
502 use crate::protos::temporal::api::{
503 enums::v1::EventType, history::v1::history_event::Attributes,
504 };
505 use crate::protosext::HistoryInfoError;
506 use prost::alloc::fmt::Formatter;
507 use std::fmt::Display;
508
509 tonic::include_proto!("temporal.api.history.v1");
510
511 impl History {
512 pub(crate) fn get_workflow_task_count(
519 &self,
520 up_to_event_id: Option<i64>,
521 ) -> Result<usize, HistoryInfoError> {
522 let mut last_wf_started_id = 0;
523 let mut count = 0;
524 let mut history = self.events.iter().peekable();
525 while let Some(event) = history.next() {
526 let next_event = history.peek();
527
528 if event.is_final_wf_execution_event() {
529 return Ok(count);
531 }
532
533 if let Some(upto) = up_to_event_id {
534 if event.event_id > upto {
535 return Ok(count);
536 }
537 }
538
539 let next_is_completed = next_event.map_or(false, |ne| {
540 ne.event_type == EventType::WorkflowTaskCompleted as i32
541 });
542
543 if event.event_type == EventType::WorkflowTaskStarted as i32
544 && (next_event.is_none() || next_is_completed)
545 {
546 last_wf_started_id = event.event_id;
547 count += 1;
548 }
549
550 if next_event.is_none() {
551 if last_wf_started_id != event.event_id {
552 return Err(HistoryInfoError::HistoryEndsUnexpectedly);
553 }
554 return Ok(count);
555 }
556 }
557 Ok(count)
558 }
559 }
560
561 impl HistoryEvent {
562 pub fn is_command_event(&self) -> bool {
564 if let Some(et) = EventType::from_i32(self.event_type) {
565 match et {
566 EventType::ActivityTaskScheduled
567 | EventType::ActivityTaskCancelRequested
568 | EventType::MarkerRecorded
569 | EventType::RequestCancelExternalWorkflowExecutionInitiated
570 | EventType::SignalExternalWorkflowExecutionInitiated
571 | EventType::StartChildWorkflowExecutionInitiated
572 | EventType::TimerCanceled
573 | EventType::TimerStarted
574 | EventType::UpsertWorkflowSearchAttributes
575 | EventType::WorkflowExecutionCanceled
576 | EventType::WorkflowExecutionCompleted
577 | EventType::WorkflowExecutionContinuedAsNew
578 | EventType::WorkflowExecutionFailed => true,
579 _ => false,
580 }
581 } else {
582 debug!(
583 "Could not determine type of event with enum index {}",
584 self.event_type
585 );
586 false
587 }
588 }
589
590 pub fn get_initial_command_event_id(&self) -> Option<i64> {
594 self.attributes.as_ref().and_then(|a| {
595 match a {
598 Attributes::ActivityTaskStartedEventAttributes(a) =>
599 Some(a.scheduled_event_id),
600 Attributes::ActivityTaskCompletedEventAttributes(a) =>
601 Some(a.scheduled_event_id),
602 Attributes::ActivityTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
603 Attributes::ActivityTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
604 Attributes::ActivityTaskCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id),
605 Attributes::ActivityTaskCanceledEventAttributes(a) => Some(a.scheduled_event_id),
606 Attributes::TimerFiredEventAttributes(a) => Some(a.started_event_id),
607 Attributes::TimerCanceledEventAttributes(a) => Some(a.started_event_id),
608 Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
609 Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(a) => Some(a.initiated_event_id),
610 Attributes::StartChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
611 Attributes::ChildWorkflowExecutionStartedEventAttributes(a) => Some(a.initiated_event_id),
612 Attributes::ChildWorkflowExecutionCompletedEventAttributes(a) => Some(a.initiated_event_id),
613 Attributes::ChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
614 Attributes::ChildWorkflowExecutionCanceledEventAttributes(a) => Some(a.initiated_event_id),
615 Attributes::ChildWorkflowExecutionTimedOutEventAttributes(a) => Some(a.initiated_event_id),
616 Attributes::ChildWorkflowExecutionTerminatedEventAttributes(a) => Some(a.initiated_event_id),
617 Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id),
618 Attributes::ExternalWorkflowExecutionSignaledEventAttributes(a) => Some(a.initiated_event_id),
619 Attributes::WorkflowTaskStartedEventAttributes(a) => Some(a.scheduled_event_id),
620 Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id),
621 Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id),
622 Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id),
623 _ => None
624 }
625 })
626 }
627
628 pub fn is_final_wf_execution_event(&self) -> bool {
630 match EventType::from_i32(self.event_type) {
631 Some(EventType::WorkflowExecutionCompleted) => true,
632 Some(EventType::WorkflowExecutionCanceled) => true,
633 Some(EventType::WorkflowExecutionFailed) => true,
634 Some(EventType::WorkflowExecutionTimedOut) => true,
635 Some(EventType::WorkflowExecutionContinuedAsNew) => true,
636 Some(EventType::WorkflowExecutionTerminated) => true,
637 _ => false,
638 }
639 }
640 }
641
642 impl Display for HistoryEvent {
643 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
644 write!(
645 f,
646 "HistoryEvent(id: {}, {:?})",
647 self.event_id,
648 EventType::from_i32(self.event_type)
649 )
650 }
651 }
652 }
653 }
654
655 pub mod namespace {
656 pub mod v1 {
657 tonic::include_proto!("temporal.api.namespace.v1");
658 }
659 }
660
661 pub mod query {
662 pub mod v1 {
663 tonic::include_proto!("temporal.api.query.v1");
664 }
665 }
666
667 pub mod replication {
668 pub mod v1 {
669 tonic::include_proto!("temporal.api.replication.v1");
670 }
671 }
672
673 pub mod taskqueue {
674 pub mod v1 {
675 use crate::protos::temporal::api::enums::v1::TaskQueueKind;
676 tonic::include_proto!("temporal.api.taskqueue.v1");
677
678 impl From<String> for TaskQueue {
679 fn from(name: String) -> Self {
680 Self {
681 name,
682 kind: TaskQueueKind::Normal as i32,
683 }
684 }
685 }
686 }
687 }
688
689 pub mod version {
690 pub mod v1 {
691 tonic::include_proto!("temporal.api.version.v1");
692 }
693 }
694
695 pub mod workflow {
696 pub mod v1 {
697 tonic::include_proto!("temporal.api.workflow.v1");
698 }
699 }
700
701 pub mod workflowservice {
702 pub mod v1 {
703 tonic::include_proto!("temporal.api.workflowservice.v1");
704 }
705 }
706 }
707}