1use crate::{
2 NamespacedClient, WorkflowCancelOptions, WorkflowDescribeOptions, WorkflowExecuteUpdateOptions,
3 WorkflowFetchHistoryOptions, WorkflowGetResultOptions, WorkflowQueryOptions,
4 WorkflowSignalOptions, WorkflowStartUpdateOptions, WorkflowTerminateOptions,
5 WorkflowUpdateWaitStage,
6 errors::{
7 WorkflowGetResultError, WorkflowInteractionError, WorkflowQueryError, WorkflowUpdateError,
8 },
9 grpc::WorkflowService,
10};
11use std::{fmt::Debug, marker::PhantomData};
12pub use temporalio_common::UntypedWorkflow;
13use temporalio_common::{
14 HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
15 data_converters::{
16 DataConverter, GenericPayloadConverter, PayloadConversionError, PayloadConverter, RawValue,
17 SerializationContext, SerializationContextData,
18 },
19 payload_visitor::decode_payloads,
20 protos::{
21 coresdk::FromPayloadsExt,
22 proto_ts_to_system_time,
23 temporal::api::{
24 common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution},
25 enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage},
26 failure::v1::Failure,
27 history::{
28 self,
29 v1::{HistoryEvent, history_event::Attributes},
30 },
31 query::v1::WorkflowQuery,
32 sdk::v1::UserMetadata,
33 update::{self, v1::WaitPolicy},
34 workflow::v1 as workflow,
35 workflowservice::v1::{
36 DescribeWorkflowExecutionRequest, DescribeWorkflowExecutionResponse,
37 GetWorkflowExecutionHistoryRequest, PollWorkflowExecutionUpdateRequest,
38 QueryWorkflowRequest, RequestCancelWorkflowExecutionRequest,
39 SignalWorkflowExecutionRequest, TerminateWorkflowExecutionRequest,
40 UpdateWorkflowExecutionRequest,
41 },
42 },
43 },
44};
45use tonic::IntoRequest;
46use uuid::Uuid;
47
48#[derive(Debug, Clone, Default, PartialEq, Eq)]
49struct DecodedUserMetadata {
50 summary: Option<String>,
51 details: Option<String>,
52}
53
54fn decode_user_metadata(
55 context: &SerializationContextData,
56 user_metadata: Option<UserMetadata>,
57) -> Result<DecodedUserMetadata, PayloadConversionError> {
58 let payload_converter = PayloadConverter::default();
59 let context = SerializationContext {
60 data: context,
61 converter: &payload_converter,
62 };
63 let (summary, details) = user_metadata
64 .map(|metadata| (metadata.summary, metadata.details))
65 .unwrap_or_default();
66 Ok(DecodedUserMetadata {
67 summary: match summary {
68 Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
69 None => None,
70 },
71 details: match details {
72 Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
73 None => None,
74 },
75 })
76}
77
78#[derive(Debug)]
80#[allow(clippy::large_enum_variant)]
81pub enum WorkflowExecutionResult<T> {
82 Succeeded(T),
84 Failed(Failure),
86 Cancelled {
88 details: Vec<Payload>,
90 },
91 Terminated {
93 details: Vec<Payload>,
95 },
96 TimedOut,
98 ContinuedAsNew,
100}
101
102#[derive(Debug, Clone)]
106pub struct WorkflowExecutionDescription {
107 pub raw_description: DescribeWorkflowExecutionResponse,
109 history_length: usize,
110 static_summary: Option<String>,
111 static_details: Option<String>,
112}
113
114impl WorkflowExecutionDescription {
115 async fn new(
116 mut raw_description: DescribeWorkflowExecutionResponse,
117 data_converter: &DataConverter,
118 ) -> Result<Self, PayloadConversionError> {
119 let raw_user_metadata = raw_description
120 .execution_config
121 .as_ref()
122 .and_then(|cfg| cfg.user_metadata.clone());
123 decode_payloads(
124 &mut raw_description,
125 data_converter.codec(),
126 &SerializationContextData::Workflow,
127 )
128 .await;
129 let decoded_metadata =
130 decode_user_metadata(&SerializationContextData::Workflow, raw_user_metadata)?;
131 let history_length_raw = raw_description
132 .workflow_execution_info
133 .as_ref()
134 .map(|info| info.history_length)
135 .unwrap_or(0);
136 let history_length = history_length_raw.try_into().map_err(|_| {
137 PayloadConversionError::EncodingError(
138 format!("workflow history_length must be non-negative, got {history_length_raw}")
139 .into(),
140 )
141 })?;
142 Ok(Self {
143 raw_description,
144 history_length,
145 static_summary: decoded_metadata.summary,
146 static_details: decoded_metadata.details,
147 })
148 }
149
150 pub fn id(&self) -> &str {
152 self.execution().workflow_id.as_str()
153 }
154
155 pub fn run_id(&self) -> &str {
157 self.execution().run_id.as_str()
158 }
159
160 pub fn workflow_type(&self) -> &str {
162 self.workflow_type_info().name.as_str()
163 }
164
165 pub fn status(
167 &self,
168 ) -> temporalio_common::protos::temporal::api::enums::v1::WorkflowExecutionStatus {
169 self.workflow_info().status()
170 }
171
172 pub fn start_time(&self) -> Option<std::time::SystemTime> {
174 self.workflow_info()
175 .start_time
176 .as_ref()
177 .and_then(proto_ts_to_system_time)
178 }
179
180 pub fn execution_time(&self) -> Option<std::time::SystemTime> {
182 self.workflow_info()
183 .execution_time
184 .as_ref()
185 .and_then(proto_ts_to_system_time)
186 }
187
188 pub fn close_time(&self) -> Option<std::time::SystemTime> {
190 self.workflow_info()
191 .close_time
192 .as_ref()
193 .and_then(proto_ts_to_system_time)
194 }
195
196 pub fn task_queue(&self) -> &str {
198 self.workflow_info().task_queue.as_str()
199 }
200
201 pub fn history_length(&self) -> usize {
203 self.history_length
204 }
205
206 pub fn memo(&self) -> Option<&temporalio_common::protos::temporal::api::common::v1::Memo> {
208 self.workflow_info().memo.as_ref()
209 }
210
211 pub fn parent_id(&self) -> Option<&str> {
213 self.workflow_info()
214 .parent_execution
215 .as_ref()
216 .map(|e| e.workflow_id.as_str())
217 }
218
219 pub fn parent_run_id(&self) -> Option<&str> {
221 self.workflow_info()
222 .parent_execution
223 .as_ref()
224 .map(|e| e.run_id.as_str())
225 }
226
227 pub fn search_attributes(
229 &self,
230 ) -> Option<&temporalio_common::protos::temporal::api::common::v1::SearchAttributes> {
231 self.workflow_info().search_attributes.as_ref()
232 }
233
234 pub fn static_summary(&self) -> Option<&str> {
236 self.static_summary.as_deref()
237 }
238
239 pub fn static_details(&self) -> Option<&str> {
241 self.static_details.as_deref()
242 }
243
244 pub fn raw(&self) -> &DescribeWorkflowExecutionResponse {
246 &self.raw_description
247 }
248
249 pub fn into_raw(self) -> DescribeWorkflowExecutionResponse {
251 self.raw_description
252 }
253
254 fn workflow_info(&self) -> &workflow::WorkflowExecutionInfo {
255 self.raw_description
256 .workflow_execution_info
257 .as_ref()
258 .expect("describe response missing workflow_execution_info")
259 }
260
261 fn execution(&self) -> &ProtoWorkflowExecution {
262 self.workflow_info()
263 .execution
264 .as_ref()
265 .expect("describe response missing workflow_execution_info.execution")
266 }
267
268 fn workflow_type_info(
269 &self,
270 ) -> &temporalio_common::protos::temporal::api::common::v1::WorkflowType {
271 self.workflow_info()
272 .r#type
273 .as_ref()
274 .expect("describe response missing workflow_execution_info.type")
275 }
276}
277
278#[derive(Debug, Clone)]
281pub struct WorkflowHistory {
282 events: Vec<HistoryEvent>,
283}
284impl From<WorkflowHistory> for history::v1::History {
285 fn from(h: WorkflowHistory) -> Self {
286 Self { events: h.events }
287 }
288}
289
290impl WorkflowHistory {
291 fn new(events: Vec<HistoryEvent>) -> Self {
292 Self { events }
293 }
294
295 pub fn events(&self) -> &[HistoryEvent] {
297 &self.events
298 }
299
300 pub fn into_events(self) -> Vec<HistoryEvent> {
302 self.events
303 }
304}
305
306#[derive(Clone)]
309pub struct WorkflowHandle<ClientT, W> {
310 client: ClientT,
311 info: WorkflowExecutionInfo,
312
313 _wf_type: PhantomData<W>,
314}
315
316impl<CT, W> WorkflowHandle<CT, W> {
317 pub fn run_id(&self) -> Option<&str> {
319 self.info.run_id.as_deref()
320 }
321}
322
323#[derive(Debug, Clone)]
325pub struct WorkflowExecutionInfo {
326 pub namespace: String,
328 pub workflow_id: String,
330 pub run_id: Option<String>,
332 pub first_execution_run_id: Option<String>,
336}
337
338impl WorkflowExecutionInfo {
339 pub fn bind_untyped<CT>(self, client: CT) -> UntypedWorkflowHandle<CT>
341 where
342 CT: WorkflowService + Clone,
343 {
344 UntypedWorkflowHandle::new(client, self)
345 }
346}
347
348pub type UntypedWorkflowHandle<CT> = WorkflowHandle<CT, UntypedWorkflow>;
351
352pub struct UntypedSignal<W> {
356 name: String,
357 _wf: PhantomData<W>,
358}
359
360impl<W> UntypedSignal<W> {
361 pub fn new(name: impl Into<String>) -> Self {
363 Self {
364 name: name.into(),
365 _wf: PhantomData,
366 }
367 }
368}
369
370impl<W: WorkflowDefinition> SignalDefinition for UntypedSignal<W> {
371 type Workflow = W;
372 type Input = RawValue;
373
374 fn name(&self) -> &str {
375 &self.name
376 }
377}
378
379pub struct UntypedQuery<W> {
383 name: String,
384 _wf: PhantomData<W>,
385}
386
387impl<W> UntypedQuery<W> {
388 pub fn new(name: impl Into<String>) -> Self {
390 Self {
391 name: name.into(),
392 _wf: PhantomData,
393 }
394 }
395}
396
397impl<W: WorkflowDefinition> QueryDefinition for UntypedQuery<W> {
398 type Workflow = W;
399 type Input = RawValue;
400 type Output = RawValue;
401
402 fn name(&self) -> &str {
403 &self.name
404 }
405}
406
407pub struct UntypedUpdate<W> {
411 name: String,
412 _wf: PhantomData<W>,
413}
414
415impl<W> UntypedUpdate<W> {
416 pub fn new(name: impl Into<String>) -> Self {
418 Self {
419 name: name.into(),
420 _wf: PhantomData,
421 }
422 }
423}
424
425impl<W: WorkflowDefinition> UpdateDefinition for UntypedUpdate<W> {
426 type Workflow = W;
427 type Input = RawValue;
428 type Output = RawValue;
429
430 fn name(&self) -> &str {
431 &self.name
432 }
433}
434
435impl<CT, W> WorkflowHandle<CT, W>
436where
437 CT: WorkflowService + Clone,
438 W: HasWorkflowDefinition,
439{
440 pub fn new(client: CT, info: WorkflowExecutionInfo) -> Self {
442 Self {
443 client,
444 info,
445 _wf_type: PhantomData::<W>,
446 }
447 }
448
449 pub fn info(&self) -> &WorkflowExecutionInfo {
451 &self.info
452 }
453
454 pub fn client(&self) -> &CT {
456 &self.client
457 }
458
459 pub async fn get_result(
461 &self,
462 opts: WorkflowGetResultOptions,
463 ) -> Result<W::Output, WorkflowGetResultError>
464 where
465 CT: WorkflowService + NamespacedClient + Clone,
466 {
467 let raw = self.get_result_raw(opts).await?;
468 match raw {
469 WorkflowExecutionResult::Succeeded(v) => Ok(v),
470 WorkflowExecutionResult::Failed(f) => Err(WorkflowGetResultError::Failed(Box::new(f))),
471 WorkflowExecutionResult::Cancelled { details } => {
472 Err(WorkflowGetResultError::Cancelled { details })
473 }
474 WorkflowExecutionResult::Terminated { details } => {
475 Err(WorkflowGetResultError::Terminated { details })
476 }
477 WorkflowExecutionResult::TimedOut => Err(WorkflowGetResultError::TimedOut),
478 WorkflowExecutionResult::ContinuedAsNew => Err(WorkflowGetResultError::ContinuedAsNew),
479 }
480 }
481
482 async fn get_result_raw(
486 &self,
487 opts: WorkflowGetResultOptions,
488 ) -> Result<WorkflowExecutionResult<W::Output>, WorkflowInteractionError>
489 where
490 CT: WorkflowService + NamespacedClient + Clone,
491 {
492 let mut run_id = self.info.run_id.clone().unwrap_or_default();
493 let fetch_opts = WorkflowFetchHistoryOptions::builder()
494 .skip_archival(true)
495 .wait_new_event(true)
496 .event_filter_type(HistoryEventFilterType::CloseEvent)
497 .build();
498
499 loop {
500 let history = self.fetch_history_for_run(&run_id, &fetch_opts).await?;
501 let mut events = history.into_events();
502
503 if events.is_empty() {
504 continue;
505 }
506
507 let event_attrs = events.pop().and_then(|ev| ev.attributes);
508
509 macro_rules! follow {
510 ($attrs:ident) => {
511 if opts.follow_runs && $attrs.new_execution_run_id != "" {
512 run_id = $attrs.new_execution_run_id;
513 continue;
514 }
515 };
516 }
517
518 let dc = self.client.data_converter();
519
520 break match event_attrs {
521 Some(Attributes::WorkflowExecutionCompletedEventAttributes(attrs)) => {
522 follow!(attrs);
523 let payload = attrs
524 .result
525 .and_then(|p| p.payloads.into_iter().next())
526 .unwrap_or_default();
527 let result: W::Output = dc
528 .from_payload(&SerializationContextData::Workflow, payload)
529 .await?;
530 Ok(WorkflowExecutionResult::Succeeded(result))
531 }
532 Some(Attributes::WorkflowExecutionFailedEventAttributes(attrs)) => {
533 follow!(attrs);
534 Ok(WorkflowExecutionResult::Failed(
535 attrs.failure.unwrap_or_default(),
536 ))
537 }
538 Some(Attributes::WorkflowExecutionCanceledEventAttributes(attrs)) => {
539 Ok(WorkflowExecutionResult::Cancelled {
540 details: Vec::from_payloads(attrs.details),
541 })
542 }
543 Some(Attributes::WorkflowExecutionTimedOutEventAttributes(attrs)) => {
544 follow!(attrs);
545 Ok(WorkflowExecutionResult::TimedOut)
546 }
547 Some(Attributes::WorkflowExecutionTerminatedEventAttributes(attrs)) => {
548 Ok(WorkflowExecutionResult::Terminated {
549 details: Vec::from_payloads(attrs.details),
550 })
551 }
552 Some(Attributes::WorkflowExecutionContinuedAsNewEventAttributes(attrs)) => {
553 if opts.follow_runs {
554 if !attrs.new_execution_run_id.is_empty() {
555 run_id = attrs.new_execution_run_id;
556 continue;
557 } else {
558 return Err(WorkflowInteractionError::Other(
559 "New execution run id was empty in continue as new event!".into(),
560 ));
561 }
562 } else {
563 Ok(WorkflowExecutionResult::ContinuedAsNew)
564 }
565 }
566 o => Err(WorkflowInteractionError::Other(
567 format!(
568 "Server returned an event that didn't match the CloseEvent filter. \
569 This is either a server bug or a new event the SDK does not understand. \
570 Event details: {o:?}"
571 )
572 .into(),
573 )),
574 };
575 }
576 }
577
578 pub async fn signal<S>(
580 &self,
581 signal: S,
582 input: S::Input,
583 opts: WorkflowSignalOptions,
584 ) -> Result<(), WorkflowInteractionError>
585 where
586 CT: WorkflowService + NamespacedClient + Clone,
587 S: SignalDefinition<Workflow = W::Run>,
588 S::Input: Send,
589 {
590 let payloads = self
591 .client
592 .data_converter()
593 .to_payloads(&SerializationContextData::Workflow, &input)
594 .await?;
595 WorkflowService::signal_workflow_execution(
596 &mut self.client.clone(),
597 SignalWorkflowExecutionRequest {
598 namespace: self.client.namespace(),
599 workflow_execution: Some(ProtoWorkflowExecution {
600 workflow_id: self.info.workflow_id.clone(),
601 run_id: self.info.run_id.clone().unwrap_or_default(),
602 }),
603 signal_name: signal.name().to_string(),
604 input: Some(Payloads { payloads }),
605 identity: self.client.identity(),
606 request_id: opts
607 .request_id
608 .unwrap_or_else(|| Uuid::new_v4().to_string()),
609 header: opts.header,
610 ..Default::default()
611 }
612 .into_request(),
613 )
614 .await
615 .map_err(WorkflowInteractionError::from_status)?;
616 Ok(())
617 }
618
619 pub async fn query<Q>(
621 &self,
622 query: Q,
623 input: Q::Input,
624 opts: WorkflowQueryOptions,
625 ) -> Result<Q::Output, WorkflowQueryError>
626 where
627 CT: WorkflowService + NamespacedClient + Clone,
628 Q: QueryDefinition<Workflow = W::Run>,
629 Q::Input: Send,
630 {
631 let dc = self.client.data_converter();
632 let payloads = dc
633 .to_payloads(&SerializationContextData::Workflow, &input)
634 .await?;
635 let response = self
636 .client
637 .clone()
638 .query_workflow(
639 QueryWorkflowRequest {
640 namespace: self.client.namespace(),
641 execution: Some(ProtoWorkflowExecution {
642 workflow_id: self.info.workflow_id.clone(),
643 run_id: self.info.run_id.clone().unwrap_or_default(),
644 }),
645 query: Some(WorkflowQuery {
646 query_type: query.name().to_string(),
647 query_args: Some(Payloads { payloads }),
648 header: opts.header,
649 }),
650 query_reject_condition: opts.reject_condition.map(|c| c as i32).unwrap_or(1),
652 }
653 .into_request(),
654 )
655 .await
656 .map_err(WorkflowQueryError::from_status)?
657 .into_inner();
658
659 if let Some(rejected) = response.query_rejected {
660 return Err(WorkflowQueryError::Rejected(rejected));
661 }
662
663 let result_payloads = response
664 .query_result
665 .map(|p| p.payloads)
666 .unwrap_or_default();
667
668 dc.from_payloads(&SerializationContextData::Workflow, result_payloads)
669 .await
670 .map_err(WorkflowQueryError::from)
671 }
672
673 pub async fn execute_update<U>(
675 &self,
676 update: U,
677 input: U::Input,
678 options: WorkflowExecuteUpdateOptions,
679 ) -> Result<U::Output, WorkflowUpdateError>
680 where
681 CT: WorkflowService + NamespacedClient + Clone,
682 U: UpdateDefinition<Workflow = W::Run>,
683 U::Input: Send,
684 U::Output: 'static,
685 {
686 let handle = self
687 .start_update(
688 update,
689 input,
690 WorkflowStartUpdateOptions::builder()
691 .maybe_update_id(options.update_id)
692 .maybe_header(options.header)
693 .wait_for_stage(WorkflowUpdateWaitStage::Completed)
694 .build(),
695 )
696 .await?;
697 handle.get_result().await
698 }
699
700 pub async fn start_update<U>(
703 &self,
704 update: U,
705 input: U::Input,
706 options: WorkflowStartUpdateOptions,
707 ) -> Result<WorkflowUpdateHandle<CT, U::Output>, WorkflowUpdateError>
708 where
709 CT: WorkflowService + NamespacedClient + Clone,
710 U: UpdateDefinition<Workflow = W::Run>,
711 U::Input: Send,
712 {
713 let dc = self.client.data_converter();
714 let payloads = dc
715 .to_payloads(&SerializationContextData::Workflow, &input)
716 .await?;
717
718 let lifecycle_stage = match options.wait_for_stage {
719 WorkflowUpdateWaitStage::Admitted => UpdateWorkflowExecutionLifecycleStage::Admitted,
720 WorkflowUpdateWaitStage::Accepted => UpdateWorkflowExecutionLifecycleStage::Accepted,
721 WorkflowUpdateWaitStage::Completed => UpdateWorkflowExecutionLifecycleStage::Completed,
722 };
723
724 let update_id = options
725 .update_id
726 .unwrap_or_else(|| Uuid::new_v4().to_string());
727
728 let response = WorkflowService::update_workflow_execution(
729 &mut self.client.clone(),
730 UpdateWorkflowExecutionRequest {
731 namespace: self.client.namespace(),
732 workflow_execution: Some(ProtoWorkflowExecution {
733 workflow_id: self.info().workflow_id.clone(),
734 run_id: self.info().run_id.clone().unwrap_or_default(),
735 }),
736 wait_policy: Some(WaitPolicy {
737 lifecycle_stage: lifecycle_stage.into(),
738 }),
739 request: Some(update::v1::Request {
740 meta: Some(update::v1::Meta {
741 update_id: update_id.clone(),
742 identity: self.client.identity(),
743 }),
744 input: Some(update::v1::Input {
745 header: options.header,
746 name: update.name().to_string(),
747 args: Some(Payloads { payloads }),
748 }),
749 }),
750 ..Default::default()
751 }
752 .into_request(),
753 )
754 .await
755 .map_err(WorkflowUpdateError::from_status)?
756 .into_inner();
757
758 let run_id = response
760 .update_ref
761 .as_ref()
762 .and_then(|r| r.workflow_execution.as_ref())
763 .map(|e| e.run_id.clone())
764 .filter(|s| !s.is_empty())
765 .or_else(|| self.info().run_id.clone());
766
767 Ok(WorkflowUpdateHandle {
768 client: self.client.clone(),
769 update_id,
770 workflow_id: self.info().workflow_id.clone(),
771 run_id,
772 known_outcome: response.outcome,
773 _output: PhantomData,
774 })
775 }
776
777 pub async fn cancel(&self, opts: WorkflowCancelOptions) -> Result<(), WorkflowInteractionError>
779 where
780 CT: NamespacedClient,
781 {
782 WorkflowService::request_cancel_workflow_execution(
783 &mut self.client.clone(),
784 RequestCancelWorkflowExecutionRequest {
785 namespace: self.client.namespace(),
786 workflow_execution: Some(ProtoWorkflowExecution {
787 workflow_id: self.info.workflow_id.clone(),
788 run_id: self.info.run_id.clone().unwrap_or_default(),
789 }),
790 identity: self.client.identity(),
791 request_id: opts
792 .request_id
793 .unwrap_or_else(|| Uuid::new_v4().to_string()),
794 first_execution_run_id: self
795 .info
796 .first_execution_run_id
797 .clone()
798 .unwrap_or_default(),
799 reason: opts.reason,
800 links: vec![],
801 }
802 .into_request(),
803 )
804 .await
805 .map_err(WorkflowInteractionError::from_status)?;
806 Ok(())
807 }
808
809 pub async fn terminate(
811 &self,
812 opts: WorkflowTerminateOptions,
813 ) -> Result<(), WorkflowInteractionError>
814 where
815 CT: NamespacedClient,
816 {
817 WorkflowService::terminate_workflow_execution(
818 &mut self.client.clone(),
819 TerminateWorkflowExecutionRequest {
820 namespace: self.client.namespace(),
821 workflow_execution: Some(ProtoWorkflowExecution {
822 workflow_id: self.info.workflow_id.clone(),
823 run_id: self.info.run_id.clone().unwrap_or_default(),
824 }),
825 reason: opts.reason,
826 details: opts.details,
827 identity: self.client.identity(),
828 first_execution_run_id: self
829 .info
830 .first_execution_run_id
831 .clone()
832 .unwrap_or_default(),
833 links: vec![],
834 }
835 .into_request(),
836 )
837 .await
838 .map_err(WorkflowInteractionError::from_status)?;
839 Ok(())
840 }
841
842 pub async fn describe(
844 &self,
845 _opts: WorkflowDescribeOptions,
846 ) -> Result<WorkflowExecutionDescription, WorkflowInteractionError>
847 where
848 CT: NamespacedClient,
849 {
850 let response = WorkflowService::describe_workflow_execution(
851 &mut self.client.clone(),
852 DescribeWorkflowExecutionRequest {
853 namespace: self.client.namespace(),
854 execution: Some(ProtoWorkflowExecution {
855 workflow_id: self.info.workflow_id.clone(),
856 run_id: self.info.run_id.clone().unwrap_or_default(),
857 }),
858 }
859 .into_request(),
860 )
861 .await
862 .map_err(WorkflowInteractionError::from_status)?
863 .into_inner();
864 WorkflowExecutionDescription::new(response, self.client.data_converter())
865 .await
866 .map_err(WorkflowInteractionError::from)
867 }
868 pub async fn fetch_history(
870 &self,
871 opts: WorkflowFetchHistoryOptions,
872 ) -> Result<WorkflowHistory, WorkflowInteractionError>
873 where
874 CT: NamespacedClient,
875 {
876 let run_id = self.info.run_id.clone().unwrap_or_default();
877 self.fetch_history_for_run(&run_id, &opts).await
878 }
879
880 async fn fetch_history_for_run(
882 &self,
883 run_id: &str,
884 opts: &WorkflowFetchHistoryOptions,
885 ) -> Result<WorkflowHistory, WorkflowInteractionError>
886 where
887 CT: NamespacedClient,
888 {
889 let mut all_events = Vec::new();
890 let mut next_page_token = vec![];
891
892 loop {
893 let response = WorkflowService::get_workflow_execution_history(
894 &mut self.client.clone(),
895 GetWorkflowExecutionHistoryRequest {
896 namespace: self.client.namespace(),
897 execution: Some(ProtoWorkflowExecution {
898 workflow_id: self.info.workflow_id.clone(),
899 run_id: run_id.to_string(),
900 }),
901 next_page_token: next_page_token.clone(),
902 skip_archival: opts.skip_archival,
903 wait_new_event: opts.wait_new_event,
904 history_event_filter_type: opts.event_filter_type as i32,
905 ..Default::default()
906 }
907 .into_request(),
908 )
909 .await
910 .map_err(WorkflowInteractionError::from_status)?
911 .into_inner();
912
913 if let Some(history) = response.history {
914 all_events.extend(history.events);
915 }
916
917 if response.next_page_token.is_empty() {
918 break;
919 }
920 next_page_token = response.next_page_token;
921 }
922
923 Ok(WorkflowHistory::new(all_events))
924 }
925}
926
927pub struct WorkflowUpdateHandle<CT, T> {
931 client: CT,
932 update_id: String,
933 workflow_id: String,
934 run_id: Option<String>,
935 known_outcome: Option<update::v1::Outcome>,
937 _output: PhantomData<T>,
938}
939
940impl<CT, T> WorkflowUpdateHandle<CT, T> {
941 pub fn id(&self) -> &str {
943 &self.update_id
944 }
945
946 pub fn workflow_id(&self) -> &str {
948 &self.workflow_id
949 }
950
951 pub fn workflow_run_id(&self) -> Option<&str> {
953 self.run_id.as_deref()
954 }
955}
956
957impl<CT, T: 'static> WorkflowUpdateHandle<CT, T>
958where
959 CT: WorkflowService + NamespacedClient + Clone,
960{
961 pub async fn get_result(&self) -> Result<T, WorkflowUpdateError>
963 where
964 T: temporalio_common::data_converters::TemporalDeserializable,
965 {
966 let outcome = if let Some(known) = &self.known_outcome {
967 known.clone()
968 } else {
969 loop {
973 let response = WorkflowService::poll_workflow_execution_update(
974 &mut self.client.clone(),
975 PollWorkflowExecutionUpdateRequest {
976 namespace: self.client.namespace(),
977 update_ref: Some(update::v1::UpdateRef {
978 workflow_execution: Some(ProtoWorkflowExecution {
979 workflow_id: self.workflow_id.clone(),
980 run_id: self.run_id.clone().unwrap_or_default(),
981 }),
982 update_id: self.update_id.clone(),
983 }),
984 identity: self.client.identity(),
985 wait_policy: Some(WaitPolicy {
986 lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed
987 .into(),
988 }),
989 }
990 .into_request(),
991 )
992 .await
993 .map_err(WorkflowUpdateError::from_status)?
994 .into_inner();
995
996 if let Some(outcome) = response.outcome {
997 break outcome;
998 }
999 }
1000 };
1001
1002 match outcome.value {
1003 Some(update::v1::outcome::Value::Success(success)) => self
1004 .client
1005 .data_converter()
1006 .from_payloads(&SerializationContextData::Workflow, success.payloads)
1007 .await
1008 .map_err(WorkflowUpdateError::from),
1009 Some(update::v1::outcome::Value::Failure(failure)) => {
1010 Err(WorkflowUpdateError::Failed(Box::new(failure)))
1011 }
1012 None => Err(WorkflowUpdateError::Other(
1013 "Update returned no outcome value".into(),
1014 )),
1015 }
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022 use std::collections::HashMap;
1023 use temporalio_common::protos::temporal::api::{
1024 common::v1::{Memo, SearchAttributes},
1025 enums::v1::WorkflowExecutionStatus,
1026 sdk::v1::UserMetadata,
1027 workflow::v1::WorkflowExecutionConfig,
1028 };
1029
1030 #[tokio::test]
1031 async fn workflow_description_accessors_expose_decoded_fields() {
1032 let converter = DataConverter::default();
1033 let memo_payload = converter
1034 .to_payload(&SerializationContextData::Workflow, &"memo-value")
1035 .await
1036 .unwrap();
1037 let search_attr_payload = converter
1038 .to_payload(&SerializationContextData::Workflow, &"search-value")
1039 .await
1040 .unwrap();
1041 let summary_payload = converter
1042 .to_payload(&SerializationContextData::Workflow, &"workflow summary")
1043 .await
1044 .unwrap();
1045 let details_payload = converter
1046 .to_payload(&SerializationContextData::Workflow, &"workflow details")
1047 .await
1048 .unwrap();
1049 let description = WorkflowExecutionDescription::new(
1050 DescribeWorkflowExecutionResponse {
1051 workflow_execution_info: Some(workflow::WorkflowExecutionInfo {
1052 execution: Some(ProtoWorkflowExecution {
1053 workflow_id: "wf-id".to_string(),
1054 run_id: "run-id".to_string(),
1055 }),
1056 r#type: Some(
1057 temporalio_common::protos::temporal::api::common::v1::WorkflowType {
1058 name: "wf-type".to_string(),
1059 },
1060 ),
1061 status: WorkflowExecutionStatus::Completed as i32,
1062 task_queue: "task-queue".to_string(),
1063 history_length: 42,
1064 memo: Some(Memo {
1065 fields: HashMap::from([("memo-key".to_string(), memo_payload.clone())]),
1066 }),
1067 parent_execution: Some(ProtoWorkflowExecution {
1068 workflow_id: "parent-id".to_string(),
1069 run_id: "parent-run-id".to_string(),
1070 }),
1071 search_attributes: Some(SearchAttributes {
1072 indexed_fields: HashMap::from([(
1073 "CustomKeywordField".to_string(),
1074 search_attr_payload.clone(),
1075 )]),
1076 }),
1077 ..Default::default()
1078 }),
1079 execution_config: Some(WorkflowExecutionConfig {
1080 user_metadata: Some(UserMetadata {
1081 summary: Some(summary_payload),
1082 details: Some(details_payload),
1083 }),
1084 ..Default::default()
1085 }),
1086 ..Default::default()
1087 },
1088 &converter,
1089 )
1090 .await
1091 .unwrap();
1092
1093 assert_eq!(description.id(), "wf-id");
1094 assert_eq!(description.run_id(), "run-id");
1095 assert_eq!(description.workflow_type(), "wf-type");
1096 assert_eq!(description.status(), WorkflowExecutionStatus::Completed);
1097 assert_eq!(description.task_queue(), "task-queue");
1098 assert_eq!(description.history_length(), 42);
1099 assert_eq!(description.parent_id(), Some("parent-id"));
1100 assert_eq!(description.parent_run_id(), Some("parent-run-id"));
1101 assert_eq!(description.memo().unwrap().fields["memo-key"], memo_payload);
1102 assert_eq!(
1103 description.search_attributes().unwrap().indexed_fields["CustomKeywordField"],
1104 search_attr_payload
1105 );
1106 assert_eq!(description.static_summary(), Some("workflow summary"));
1107 assert_eq!(description.static_details(), Some("workflow details"));
1108 }
1109
1110 #[tokio::test]
1111 async fn workflow_description_rejects_negative_history_length() {
1112 let err = WorkflowExecutionDescription::new(
1113 DescribeWorkflowExecutionResponse {
1114 workflow_execution_info: Some(workflow::WorkflowExecutionInfo {
1115 history_length: -1,
1116 ..Default::default()
1117 }),
1118 ..Default::default()
1119 },
1120 &DataConverter::default(),
1121 )
1122 .await
1123 .unwrap_err();
1124
1125 assert_eq!(
1126 err.to_string(),
1127 "Encoding error: workflow history_length must be non-negative, got -1"
1128 );
1129 }
1130}