1use crate::{
2 NamespacedClient, WorkflowCancelOptions, WorkflowDescribeOptions, WorkflowExecuteUpdateOptions,
3 WorkflowFetchHistoryOptions, WorkflowGetResultOptions, WorkflowQueryOptions,
4 WorkflowSignalOptions, WorkflowStartUpdateOptions, WorkflowTerminateOptions,
5 WorkflowUpdateWaitStage,
6 errors::{
7 WorkflowGetResultError, WorkflowInteractionError, WorkflowQueryError, WorkflowUpdateError,
8 },
9 grpc::WorkflowService,
10};
11use std::{fmt::Debug, marker::PhantomData};
12use temporalio_common::{
13 QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
14 data_converters::{RawValue, SerializationContextData},
15 protos::{
16 coresdk::FromPayloadsExt,
17 temporal::api::{
18 common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution},
19 enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage},
20 failure::v1::Failure,
21 history::{
22 self,
23 v1::{HistoryEvent, history_event::Attributes},
24 },
25 query::v1::WorkflowQuery,
26 update::{self, v1::WaitPolicy},
27 workflowservice::v1::{
28 DescribeWorkflowExecutionRequest, DescribeWorkflowExecutionResponse,
29 GetWorkflowExecutionHistoryRequest, PollWorkflowExecutionUpdateRequest,
30 QueryWorkflowRequest, RequestCancelWorkflowExecutionRequest,
31 SignalWorkflowExecutionRequest, TerminateWorkflowExecutionRequest,
32 UpdateWorkflowExecutionRequest,
33 },
34 },
35 },
36};
37use tonic::IntoRequest;
38use uuid::Uuid;
39
40#[derive(Debug)]
42#[allow(clippy::large_enum_variant)]
43pub enum WorkflowExecutionResult<T> {
44 Succeeded(T),
46 Failed(Failure),
48 Cancelled {
50 details: Vec<Payload>,
52 },
53 Terminated {
55 details: Vec<Payload>,
57 },
58 TimedOut,
60 ContinuedAsNew,
62}
63
64#[derive(Debug, Clone)]
66pub struct WorkflowExecutionDescription {
67 pub raw_description: DescribeWorkflowExecutionResponse,
69}
70
71impl WorkflowExecutionDescription {
72 fn new(raw_description: DescribeWorkflowExecutionResponse) -> Self {
73 Self { raw_description }
74 }
75}
76
77#[derive(Debug, Clone)]
80pub struct WorkflowHistory {
81 events: Vec<HistoryEvent>,
82}
83impl From<WorkflowHistory> for history::v1::History {
84 fn from(h: WorkflowHistory) -> Self {
85 Self { events: h.events }
86 }
87}
88
89impl WorkflowHistory {
90 fn new(events: Vec<HistoryEvent>) -> Self {
91 Self { events }
92 }
93
94 pub fn events(&self) -> &[HistoryEvent] {
96 &self.events
97 }
98
99 pub fn into_events(self) -> Vec<HistoryEvent> {
101 self.events
102 }
103}
104
105#[derive(Clone)]
108pub struct WorkflowHandle<ClientT, W> {
109 client: ClientT,
110 info: WorkflowExecutionInfo,
111
112 _wf_type: PhantomData<W>,
113}
114
115impl<CT, W> WorkflowHandle<CT, W> {
116 pub fn run_id(&self) -> Option<&str> {
118 self.info.run_id.as_deref()
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct WorkflowExecutionInfo {
125 pub namespace: String,
127 pub workflow_id: String,
129 pub run_id: Option<String>,
131 pub first_execution_run_id: Option<String>,
135}
136
137impl WorkflowExecutionInfo {
138 pub fn bind_untyped<CT>(self, client: CT) -> UntypedWorkflowHandle<CT>
140 where
141 CT: WorkflowService + Clone,
142 {
143 UntypedWorkflowHandle::new(client, self)
144 }
145}
146
147pub type UntypedWorkflowHandle<CT> = WorkflowHandle<CT, UntypedWorkflow>;
150
151pub struct UntypedWorkflow {
153 name: String,
154}
155impl UntypedWorkflow {
156 pub fn new(name: impl Into<String>) -> Self {
158 Self { name: name.into() }
159 }
160}
161impl WorkflowDefinition for UntypedWorkflow {
162 type Input = RawValue;
163 type Output = RawValue;
164 fn name(&self) -> &str {
165 &self.name
166 }
167}
168
169pub struct UntypedSignal<W> {
173 name: String,
174 _wf: PhantomData<W>,
175}
176
177impl<W> UntypedSignal<W> {
178 pub fn new(name: impl Into<String>) -> Self {
180 Self {
181 name: name.into(),
182 _wf: PhantomData,
183 }
184 }
185}
186
187impl<W: WorkflowDefinition> SignalDefinition for UntypedSignal<W> {
188 type Workflow = W;
189 type Input = RawValue;
190
191 fn name(&self) -> &str {
192 &self.name
193 }
194}
195
196pub struct UntypedQuery<W> {
200 name: String,
201 _wf: PhantomData<W>,
202}
203
204impl<W> UntypedQuery<W> {
205 pub fn new(name: impl Into<String>) -> Self {
207 Self {
208 name: name.into(),
209 _wf: PhantomData,
210 }
211 }
212}
213
214impl<W: WorkflowDefinition> QueryDefinition for UntypedQuery<W> {
215 type Workflow = W;
216 type Input = RawValue;
217 type Output = RawValue;
218
219 fn name(&self) -> &str {
220 &self.name
221 }
222}
223
224pub struct UntypedUpdate<W> {
228 name: String,
229 _wf: PhantomData<W>,
230}
231
232impl<W> UntypedUpdate<W> {
233 pub fn new(name: impl Into<String>) -> Self {
235 Self {
236 name: name.into(),
237 _wf: PhantomData,
238 }
239 }
240}
241
242impl<W: WorkflowDefinition> UpdateDefinition for UntypedUpdate<W> {
243 type Workflow = W;
244 type Input = RawValue;
245 type Output = RawValue;
246
247 fn name(&self) -> &str {
248 &self.name
249 }
250}
251
252impl<CT, W> WorkflowHandle<CT, W>
253where
254 CT: WorkflowService + Clone,
255 W: WorkflowDefinition,
256{
257 pub fn new(client: CT, info: WorkflowExecutionInfo) -> Self {
259 Self {
260 client,
261 info,
262 _wf_type: PhantomData::<W>,
263 }
264 }
265
266 pub fn info(&self) -> &WorkflowExecutionInfo {
268 &self.info
269 }
270
271 pub fn client(&self) -> &CT {
273 &self.client
274 }
275
276 pub async fn get_result(
278 &self,
279 opts: WorkflowGetResultOptions,
280 ) -> Result<W::Output, WorkflowGetResultError>
281 where
282 CT: WorkflowService + NamespacedClient + Clone,
283 {
284 let raw = self.get_result_raw(opts).await?;
285 match raw {
286 WorkflowExecutionResult::Succeeded(v) => Ok(v),
287 WorkflowExecutionResult::Failed(f) => Err(WorkflowGetResultError::Failed(Box::new(f))),
288 WorkflowExecutionResult::Cancelled { details } => {
289 Err(WorkflowGetResultError::Cancelled { details })
290 }
291 WorkflowExecutionResult::Terminated { details } => {
292 Err(WorkflowGetResultError::Terminated { details })
293 }
294 WorkflowExecutionResult::TimedOut => Err(WorkflowGetResultError::TimedOut),
295 WorkflowExecutionResult::ContinuedAsNew => Err(WorkflowGetResultError::ContinuedAsNew),
296 }
297 }
298
299 async fn get_result_raw(
303 &self,
304 opts: WorkflowGetResultOptions,
305 ) -> Result<WorkflowExecutionResult<W::Output>, WorkflowInteractionError>
306 where
307 CT: WorkflowService + NamespacedClient + Clone,
308 {
309 let mut run_id = self.info.run_id.clone().unwrap_or_default();
310 let fetch_opts = WorkflowFetchHistoryOptions::builder()
311 .skip_archival(true)
312 .wait_new_event(true)
313 .event_filter_type(HistoryEventFilterType::CloseEvent)
314 .build();
315
316 loop {
317 let history = self.fetch_history_for_run(&run_id, &fetch_opts).await?;
318 let mut events = history.into_events();
319
320 if events.is_empty() {
321 continue;
322 }
323
324 let event_attrs = events.pop().and_then(|ev| ev.attributes);
325
326 macro_rules! follow {
327 ($attrs:ident) => {
328 if opts.follow_runs && $attrs.new_execution_run_id != "" {
329 run_id = $attrs.new_execution_run_id;
330 continue;
331 }
332 };
333 }
334
335 let dc = self.client.data_converter();
336
337 break match event_attrs {
338 Some(Attributes::WorkflowExecutionCompletedEventAttributes(attrs)) => {
339 follow!(attrs);
340 let payload = attrs
341 .result
342 .and_then(|p| p.payloads.into_iter().next())
343 .unwrap_or_default();
344 let result: W::Output = dc
345 .from_payload(&SerializationContextData::Workflow, payload)
346 .await?;
347 Ok(WorkflowExecutionResult::Succeeded(result))
348 }
349 Some(Attributes::WorkflowExecutionFailedEventAttributes(attrs)) => {
350 follow!(attrs);
351 Ok(WorkflowExecutionResult::Failed(
352 attrs.failure.unwrap_or_default(),
353 ))
354 }
355 Some(Attributes::WorkflowExecutionCanceledEventAttributes(attrs)) => {
356 Ok(WorkflowExecutionResult::Cancelled {
357 details: Vec::from_payloads(attrs.details),
358 })
359 }
360 Some(Attributes::WorkflowExecutionTimedOutEventAttributes(attrs)) => {
361 follow!(attrs);
362 Ok(WorkflowExecutionResult::TimedOut)
363 }
364 Some(Attributes::WorkflowExecutionTerminatedEventAttributes(attrs)) => {
365 Ok(WorkflowExecutionResult::Terminated {
366 details: Vec::from_payloads(attrs.details),
367 })
368 }
369 Some(Attributes::WorkflowExecutionContinuedAsNewEventAttributes(attrs)) => {
370 if opts.follow_runs {
371 if !attrs.new_execution_run_id.is_empty() {
372 run_id = attrs.new_execution_run_id;
373 continue;
374 } else {
375 return Err(WorkflowInteractionError::Other(
376 "New execution run id was empty in continue as new event!".into(),
377 ));
378 }
379 } else {
380 Ok(WorkflowExecutionResult::ContinuedAsNew)
381 }
382 }
383 o => Err(WorkflowInteractionError::Other(
384 format!(
385 "Server returned an event that didn't match the CloseEvent filter. \
386 This is either a server bug or a new event the SDK does not understand. \
387 Event details: {o:?}"
388 )
389 .into(),
390 )),
391 };
392 }
393 }
394
395 pub async fn signal<S>(
397 &self,
398 signal: S,
399 input: S::Input,
400 opts: WorkflowSignalOptions,
401 ) -> Result<(), WorkflowInteractionError>
402 where
403 CT: WorkflowService + NamespacedClient + Clone,
404 S: SignalDefinition<Workflow = W>,
405 S::Input: Send,
406 {
407 let payloads = self
408 .client
409 .data_converter()
410 .to_payloads(&SerializationContextData::Workflow, &input)
411 .await?;
412 WorkflowService::signal_workflow_execution(
413 &mut self.client.clone(),
414 SignalWorkflowExecutionRequest {
415 namespace: self.client.namespace(),
416 workflow_execution: Some(ProtoWorkflowExecution {
417 workflow_id: self.info.workflow_id.clone(),
418 run_id: self.info.run_id.clone().unwrap_or_default(),
419 }),
420 signal_name: signal.name().to_string(),
421 input: Some(Payloads { payloads }),
422 identity: self.client.identity(),
423 request_id: opts
424 .request_id
425 .unwrap_or_else(|| Uuid::new_v4().to_string()),
426 header: opts.header,
427 ..Default::default()
428 }
429 .into_request(),
430 )
431 .await
432 .map_err(WorkflowInteractionError::from_status)?;
433 Ok(())
434 }
435
436 pub async fn query<Q>(
438 &self,
439 query: Q,
440 input: Q::Input,
441 opts: WorkflowQueryOptions,
442 ) -> Result<Q::Output, WorkflowQueryError>
443 where
444 CT: WorkflowService + NamespacedClient + Clone,
445 Q: QueryDefinition<Workflow = W>,
446 Q::Input: Send,
447 {
448 let dc = self.client.data_converter();
449 let payloads = dc
450 .to_payloads(&SerializationContextData::Workflow, &input)
451 .await?;
452 let response = self
453 .client
454 .clone()
455 .query_workflow(
456 QueryWorkflowRequest {
457 namespace: self.client.namespace(),
458 execution: Some(ProtoWorkflowExecution {
459 workflow_id: self.info.workflow_id.clone(),
460 run_id: self.info.run_id.clone().unwrap_or_default(),
461 }),
462 query: Some(WorkflowQuery {
463 query_type: query.name().to_string(),
464 query_args: Some(Payloads { payloads }),
465 header: opts.header,
466 }),
467 query_reject_condition: opts.reject_condition.map(|c| c as i32).unwrap_or(1),
469 }
470 .into_request(),
471 )
472 .await
473 .map_err(WorkflowQueryError::from_status)?
474 .into_inner();
475
476 if let Some(rejected) = response.query_rejected {
477 return Err(WorkflowQueryError::Rejected(rejected));
478 }
479
480 let result_payloads = response
481 .query_result
482 .map(|p| p.payloads)
483 .unwrap_or_default();
484
485 dc.from_payloads(&SerializationContextData::Workflow, result_payloads)
486 .await
487 .map_err(WorkflowQueryError::from)
488 }
489
490 pub async fn execute_update<U>(
492 &self,
493 update: U,
494 input: U::Input,
495 options: WorkflowExecuteUpdateOptions,
496 ) -> Result<U::Output, WorkflowUpdateError>
497 where
498 CT: WorkflowService + NamespacedClient + Clone,
499 U: UpdateDefinition<Workflow = W>,
500 U::Input: Send,
501 U::Output: 'static,
502 {
503 let handle = self
504 .start_update(
505 update,
506 input,
507 WorkflowStartUpdateOptions::builder()
508 .maybe_update_id(options.update_id)
509 .maybe_header(options.header)
510 .wait_for_stage(WorkflowUpdateWaitStage::Completed)
511 .build(),
512 )
513 .await?;
514 handle.get_result().await
515 }
516
517 pub async fn start_update<U>(
520 &self,
521 update: U,
522 input: U::Input,
523 options: WorkflowStartUpdateOptions,
524 ) -> Result<WorkflowUpdateHandle<CT, U::Output>, WorkflowUpdateError>
525 where
526 CT: WorkflowService + NamespacedClient + Clone,
527 U: UpdateDefinition<Workflow = W>,
528 U::Input: Send,
529 {
530 let dc = self.client.data_converter();
531 let payloads = dc
532 .to_payloads(&SerializationContextData::Workflow, &input)
533 .await?;
534
535 let lifecycle_stage = match options.wait_for_stage {
536 WorkflowUpdateWaitStage::Admitted => UpdateWorkflowExecutionLifecycleStage::Admitted,
537 WorkflowUpdateWaitStage::Accepted => UpdateWorkflowExecutionLifecycleStage::Accepted,
538 WorkflowUpdateWaitStage::Completed => UpdateWorkflowExecutionLifecycleStage::Completed,
539 };
540
541 let update_id = options
542 .update_id
543 .unwrap_or_else(|| Uuid::new_v4().to_string());
544
545 let response = WorkflowService::update_workflow_execution(
546 &mut self.client.clone(),
547 UpdateWorkflowExecutionRequest {
548 namespace: self.client.namespace(),
549 workflow_execution: Some(ProtoWorkflowExecution {
550 workflow_id: self.info().workflow_id.clone(),
551 run_id: self.info().run_id.clone().unwrap_or_default(),
552 }),
553 wait_policy: Some(WaitPolicy {
554 lifecycle_stage: lifecycle_stage.into(),
555 }),
556 request: Some(update::v1::Request {
557 meta: Some(update::v1::Meta {
558 update_id: update_id.clone(),
559 identity: self.client.identity(),
560 }),
561 input: Some(update::v1::Input {
562 header: options.header,
563 name: update.name().to_string(),
564 args: Some(Payloads { payloads }),
565 }),
566 }),
567 ..Default::default()
568 }
569 .into_request(),
570 )
571 .await
572 .map_err(WorkflowUpdateError::from_status)?
573 .into_inner();
574
575 let run_id = response
577 .update_ref
578 .as_ref()
579 .and_then(|r| r.workflow_execution.as_ref())
580 .map(|e| e.run_id.clone())
581 .filter(|s| !s.is_empty())
582 .or_else(|| self.info().run_id.clone());
583
584 Ok(WorkflowUpdateHandle {
585 client: self.client.clone(),
586 update_id,
587 workflow_id: self.info().workflow_id.clone(),
588 run_id,
589 known_outcome: response.outcome,
590 _output: PhantomData,
591 })
592 }
593
594 pub async fn cancel(&self, opts: WorkflowCancelOptions) -> Result<(), WorkflowInteractionError>
596 where
597 CT: NamespacedClient,
598 {
599 WorkflowService::request_cancel_workflow_execution(
600 &mut self.client.clone(),
601 RequestCancelWorkflowExecutionRequest {
602 namespace: self.client.namespace(),
603 workflow_execution: Some(ProtoWorkflowExecution {
604 workflow_id: self.info.workflow_id.clone(),
605 run_id: self.info.run_id.clone().unwrap_or_default(),
606 }),
607 identity: self.client.identity(),
608 request_id: opts
609 .request_id
610 .unwrap_or_else(|| Uuid::new_v4().to_string()),
611 first_execution_run_id: self
612 .info
613 .first_execution_run_id
614 .clone()
615 .unwrap_or_default(),
616 reason: opts.reason,
617 links: vec![],
618 }
619 .into_request(),
620 )
621 .await
622 .map_err(WorkflowInteractionError::from_status)?;
623 Ok(())
624 }
625
626 pub async fn terminate(
628 &self,
629 opts: WorkflowTerminateOptions,
630 ) -> Result<(), WorkflowInteractionError>
631 where
632 CT: NamespacedClient,
633 {
634 WorkflowService::terminate_workflow_execution(
635 &mut self.client.clone(),
636 TerminateWorkflowExecutionRequest {
637 namespace: self.client.namespace(),
638 workflow_execution: Some(ProtoWorkflowExecution {
639 workflow_id: self.info.workflow_id.clone(),
640 run_id: self.info.run_id.clone().unwrap_or_default(),
641 }),
642 reason: opts.reason,
643 details: opts.details,
644 identity: self.client.identity(),
645 first_execution_run_id: self
646 .info
647 .first_execution_run_id
648 .clone()
649 .unwrap_or_default(),
650 links: vec![],
651 }
652 .into_request(),
653 )
654 .await
655 .map_err(WorkflowInteractionError::from_status)?;
656 Ok(())
657 }
658
659 pub async fn describe(
661 &self,
662 _opts: WorkflowDescribeOptions,
663 ) -> Result<WorkflowExecutionDescription, WorkflowInteractionError>
664 where
665 CT: NamespacedClient,
666 {
667 let response = WorkflowService::describe_workflow_execution(
668 &mut self.client.clone(),
669 DescribeWorkflowExecutionRequest {
670 namespace: self.client.namespace(),
671 execution: Some(ProtoWorkflowExecution {
672 workflow_id: self.info.workflow_id.clone(),
673 run_id: self.info.run_id.clone().unwrap_or_default(),
674 }),
675 }
676 .into_request(),
677 )
678 .await
679 .map_err(WorkflowInteractionError::from_status)?
680 .into_inner();
681 Ok(WorkflowExecutionDescription::new(response))
682 }
683 pub async fn fetch_history(
685 &self,
686 opts: WorkflowFetchHistoryOptions,
687 ) -> Result<WorkflowHistory, WorkflowInteractionError>
688 where
689 CT: NamespacedClient,
690 {
691 let run_id = self.info.run_id.clone().unwrap_or_default();
692 self.fetch_history_for_run(&run_id, &opts).await
693 }
694
695 async fn fetch_history_for_run(
697 &self,
698 run_id: &str,
699 opts: &WorkflowFetchHistoryOptions,
700 ) -> Result<WorkflowHistory, WorkflowInteractionError>
701 where
702 CT: NamespacedClient,
703 {
704 let mut all_events = Vec::new();
705 let mut next_page_token = vec![];
706
707 loop {
708 let response = WorkflowService::get_workflow_execution_history(
709 &mut self.client.clone(),
710 GetWorkflowExecutionHistoryRequest {
711 namespace: self.client.namespace(),
712 execution: Some(ProtoWorkflowExecution {
713 workflow_id: self.info.workflow_id.clone(),
714 run_id: run_id.to_string(),
715 }),
716 next_page_token: next_page_token.clone(),
717 skip_archival: opts.skip_archival,
718 wait_new_event: opts.wait_new_event,
719 history_event_filter_type: opts.event_filter_type as i32,
720 ..Default::default()
721 }
722 .into_request(),
723 )
724 .await
725 .map_err(WorkflowInteractionError::from_status)?
726 .into_inner();
727
728 if let Some(history) = response.history {
729 all_events.extend(history.events);
730 }
731
732 if response.next_page_token.is_empty() {
733 break;
734 }
735 next_page_token = response.next_page_token;
736 }
737
738 Ok(WorkflowHistory::new(all_events))
739 }
740}
741
742pub struct WorkflowUpdateHandle<CT, T> {
746 client: CT,
747 update_id: String,
748 workflow_id: String,
749 run_id: Option<String>,
750 known_outcome: Option<update::v1::Outcome>,
752 _output: PhantomData<T>,
753}
754
755impl<CT, T> WorkflowUpdateHandle<CT, T> {
756 pub fn id(&self) -> &str {
758 &self.update_id
759 }
760
761 pub fn workflow_id(&self) -> &str {
763 &self.workflow_id
764 }
765
766 pub fn workflow_run_id(&self) -> Option<&str> {
768 self.run_id.as_deref()
769 }
770}
771
772impl<CT, T: 'static> WorkflowUpdateHandle<CT, T>
773where
774 CT: WorkflowService + NamespacedClient + Clone,
775{
776 pub async fn get_result(&self) -> Result<T, WorkflowUpdateError>
778 where
779 T: temporalio_common::data_converters::TemporalDeserializable,
780 {
781 let outcome = if let Some(known) = &self.known_outcome {
782 known.clone()
783 } else {
784 let response = WorkflowService::poll_workflow_execution_update(
785 &mut self.client.clone(),
786 PollWorkflowExecutionUpdateRequest {
787 namespace: self.client.namespace(),
788 update_ref: Some(update::v1::UpdateRef {
789 workflow_execution: Some(ProtoWorkflowExecution {
790 workflow_id: self.workflow_id.clone(),
791 run_id: self.run_id.clone().unwrap_or_default(),
792 }),
793 update_id: self.update_id.clone(),
794 }),
795 identity: self.client.identity(),
796 wait_policy: Some(WaitPolicy {
797 lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed.into(),
798 }),
799 }
800 .into_request(),
801 )
802 .await
803 .map_err(WorkflowUpdateError::from_status)?
804 .into_inner();
805
806 response.outcome.ok_or_else(|| {
807 WorkflowUpdateError::Other("Update poll returned no outcome".into())
808 })?
809 };
810
811 match outcome.value {
812 Some(update::v1::outcome::Value::Success(success)) => self
813 .client
814 .data_converter()
815 .from_payloads(&SerializationContextData::Workflow, success.payloads)
816 .await
817 .map_err(WorkflowUpdateError::from),
818 Some(update::v1::outcome::Value::Failure(failure)) => {
819 Err(WorkflowUpdateError::Failed(Box::new(failure)))
820 }
821 None => Err(WorkflowUpdateError::Other(
822 "Update returned no outcome value".into(),
823 )),
824 }
825 }
826}