Skip to main content

temporalio_client/
workflow_handle.rs

1use crate::{
2    NamespacedClient, WorkflowCancelOptions, WorkflowDescribeOptions, WorkflowExecuteUpdateOptions,
3    WorkflowFetchHistoryOptions, WorkflowGetResultOptions, WorkflowQueryOptions,
4    WorkflowSignalOptions, WorkflowStartUpdateOptions, WorkflowTerminateOptions,
5    WorkflowUpdateWaitStage,
6    errors::{
7        WorkflowGetResultError, WorkflowInteractionError, WorkflowQueryError, WorkflowUpdateError,
8    },
9    grpc::WorkflowService,
10};
11use std::{fmt::Debug, marker::PhantomData};
12pub use temporalio_common::UntypedWorkflow;
13use temporalio_common::{
14    HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
15    data_converters::{
16        DataConverter, GenericPayloadConverter, PayloadConversionError, PayloadConverter, RawValue,
17        SerializationContext, SerializationContextData,
18    },
19    payload_visitor::decode_payloads,
20    protos::{
21        coresdk::FromPayloadsExt,
22        proto_ts_to_system_time,
23        temporal::api::{
24            common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution},
25            enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage},
26            failure::v1::Failure,
27            history::{
28                self,
29                v1::{HistoryEvent, history_event::Attributes},
30            },
31            query::v1::WorkflowQuery,
32            sdk::v1::UserMetadata,
33            update::{self, v1::WaitPolicy},
34            workflow::v1 as workflow,
35            workflowservice::v1::{
36                DescribeWorkflowExecutionRequest, DescribeWorkflowExecutionResponse,
37                GetWorkflowExecutionHistoryRequest, PollWorkflowExecutionUpdateRequest,
38                QueryWorkflowRequest, RequestCancelWorkflowExecutionRequest,
39                SignalWorkflowExecutionRequest, TerminateWorkflowExecutionRequest,
40                UpdateWorkflowExecutionRequest,
41            },
42        },
43    },
44};
45use tonic::IntoRequest;
46use uuid::Uuid;
47
48#[derive(Debug, Clone, Default, PartialEq, Eq)]
49struct DecodedUserMetadata {
50    summary: Option<String>,
51    details: Option<String>,
52}
53
54fn decode_user_metadata(
55    context: &SerializationContextData,
56    user_metadata: Option<UserMetadata>,
57) -> Result<DecodedUserMetadata, PayloadConversionError> {
58    let payload_converter = PayloadConverter::default();
59    let context = SerializationContext {
60        data: context,
61        converter: &payload_converter,
62    };
63    let (summary, details) = user_metadata
64        .map(|metadata| (metadata.summary, metadata.details))
65        .unwrap_or_default();
66    Ok(DecodedUserMetadata {
67        summary: match summary {
68            Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
69            None => None,
70        },
71        details: match details {
72            Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
73            None => None,
74        },
75    })
76}
77
78/// Enumerates terminal states for a particular workflow execution
79#[derive(Debug)]
80#[allow(clippy::large_enum_variant)]
81pub enum WorkflowExecutionResult<T> {
82    /// The workflow finished successfully
83    Succeeded(T),
84    /// The workflow finished in failure
85    Failed(Failure),
86    /// The workflow was cancelled
87    Cancelled {
88        /// Details provided at cancellation time
89        details: Vec<Payload>,
90    },
91    /// The workflow was terminated
92    Terminated {
93        /// Details provided at termination time
94        details: Vec<Payload>,
95    },
96    /// The workflow timed out
97    TimedOut,
98    /// The workflow continued as new
99    ContinuedAsNew,
100}
101
102/// Description of a workflow execution returned by `WorkflowHandle::describe`.
103///
104/// Access to the underlying Protobuf message is provided by [`raw`](Self::raw).
105#[derive(Debug, Clone)]
106pub struct WorkflowExecutionDescription {
107    /// The raw proto response from the server.
108    pub raw_description: DescribeWorkflowExecutionResponse,
109    history_length: usize,
110    static_summary: Option<String>,
111    static_details: Option<String>,
112}
113
114impl WorkflowExecutionDescription {
115    async fn new(
116        mut raw_description: DescribeWorkflowExecutionResponse,
117        data_converter: &DataConverter,
118    ) -> Result<Self, PayloadConversionError> {
119        let raw_user_metadata = raw_description
120            .execution_config
121            .as_ref()
122            .and_then(|cfg| cfg.user_metadata.clone());
123        decode_payloads(
124            &mut raw_description,
125            data_converter.codec(),
126            &SerializationContextData::Workflow,
127        )
128        .await;
129        let decoded_metadata =
130            decode_user_metadata(&SerializationContextData::Workflow, raw_user_metadata)?;
131        let history_length_raw = raw_description
132            .workflow_execution_info
133            .as_ref()
134            .map(|info| info.history_length)
135            .unwrap_or(0);
136        let history_length = history_length_raw.try_into().map_err(|_| {
137            PayloadConversionError::EncodingError(
138                format!("workflow history_length must be non-negative, got {history_length_raw}")
139                    .into(),
140            )
141        })?;
142        Ok(Self {
143            raw_description,
144            history_length,
145            static_summary: decoded_metadata.summary,
146            static_details: decoded_metadata.details,
147        })
148    }
149
150    /// The workflow ID.
151    pub fn id(&self) -> &str {
152        self.execution().workflow_id.as_str()
153    }
154
155    /// The run ID.
156    pub fn run_id(&self) -> &str {
157        self.execution().run_id.as_str()
158    }
159
160    /// The workflow type name.
161    pub fn workflow_type(&self) -> &str {
162        self.workflow_type_info().name.as_str()
163    }
164
165    /// The current status of the workflow execution.
166    pub fn status(
167        &self,
168    ) -> temporalio_common::protos::temporal::api::enums::v1::WorkflowExecutionStatus {
169        self.workflow_info().status()
170    }
171
172    /// When the workflow was created.
173    pub fn start_time(&self) -> Option<std::time::SystemTime> {
174        self.workflow_info()
175            .start_time
176            .as_ref()
177            .and_then(proto_ts_to_system_time)
178    }
179
180    /// When the workflow run started or should start.
181    pub fn execution_time(&self) -> Option<std::time::SystemTime> {
182        self.workflow_info()
183            .execution_time
184            .as_ref()
185            .and_then(proto_ts_to_system_time)
186    }
187
188    /// When the workflow was closed, if closed.
189    pub fn close_time(&self) -> Option<std::time::SystemTime> {
190        self.workflow_info()
191            .close_time
192            .as_ref()
193            .and_then(proto_ts_to_system_time)
194    }
195
196    /// The task queue the workflow runs on.
197    pub fn task_queue(&self) -> &str {
198        self.workflow_info().task_queue.as_str()
199    }
200
201    /// Number of events in history.
202    pub fn history_length(&self) -> usize {
203        self.history_length
204    }
205
206    /// Workflow memo after codec decoding.
207    pub fn memo(&self) -> Option<&temporalio_common::protos::temporal::api::common::v1::Memo> {
208        self.workflow_info().memo.as_ref()
209    }
210
211    /// Parent workflow ID, if this is a child workflow.
212    pub fn parent_id(&self) -> Option<&str> {
213        self.workflow_info()
214            .parent_execution
215            .as_ref()
216            .map(|e| e.workflow_id.as_str())
217    }
218
219    /// Parent run ID, if this is a child workflow.
220    pub fn parent_run_id(&self) -> Option<&str> {
221        self.workflow_info()
222            .parent_execution
223            .as_ref()
224            .map(|e| e.run_id.as_str())
225    }
226
227    /// Search attributes on the workflow.
228    pub fn search_attributes(
229        &self,
230    ) -> Option<&temporalio_common::protos::temporal::api::common::v1::SearchAttributes> {
231        self.workflow_info().search_attributes.as_ref()
232    }
233
234    /// Static summary configured on the workflow, if present.
235    pub fn static_summary(&self) -> Option<&str> {
236        self.static_summary.as_deref()
237    }
238
239    /// Static details configured on the workflow, if present.
240    pub fn static_details(&self) -> Option<&str> {
241        self.static_details.as_deref()
242    }
243
244    /// Access the raw proto for additional fields not exposed via accessors.
245    pub fn raw(&self) -> &DescribeWorkflowExecutionResponse {
246        &self.raw_description
247    }
248
249    /// Consume the wrapper and return the raw proto.
250    pub fn into_raw(self) -> DescribeWorkflowExecutionResponse {
251        self.raw_description
252    }
253
254    fn workflow_info(&self) -> &workflow::WorkflowExecutionInfo {
255        self.raw_description
256            .workflow_execution_info
257            .as_ref()
258            .expect("describe response missing workflow_execution_info")
259    }
260
261    fn execution(&self) -> &ProtoWorkflowExecution {
262        self.workflow_info()
263            .execution
264            .as_ref()
265            .expect("describe response missing workflow_execution_info.execution")
266    }
267
268    fn workflow_type_info(
269        &self,
270    ) -> &temporalio_common::protos::temporal::api::common::v1::WorkflowType {
271        self.workflow_info()
272            .r#type
273            .as_ref()
274            .expect("describe response missing workflow_execution_info.type")
275    }
276}
277
278// TODO [rust-sdk-branch]: Could implment stream a-la ListWorkflowsStream
279/// Workflow execution history returned by `WorkflowHandle::fetch_history`.
280#[derive(Debug, Clone)]
281pub struct WorkflowHistory {
282    events: Vec<HistoryEvent>,
283}
284impl From<WorkflowHistory> for history::v1::History {
285    fn from(h: WorkflowHistory) -> Self {
286        Self { events: h.events }
287    }
288}
289
290impl WorkflowHistory {
291    fn new(events: Vec<HistoryEvent>) -> Self {
292        Self { events }
293    }
294
295    /// The history events.
296    pub fn events(&self) -> &[HistoryEvent] {
297        &self.events
298    }
299
300    /// Consume the history and return the events.
301    pub fn into_events(self) -> Vec<HistoryEvent> {
302        self.events
303    }
304}
305
306/// A workflow handle which can refer to a specific workflow run, or a chain of workflow runs with
307/// the same workflow id.
308#[derive(Clone)]
309pub struct WorkflowHandle<ClientT, W> {
310    client: ClientT,
311    info: WorkflowExecutionInfo,
312
313    _wf_type: PhantomData<W>,
314}
315
316impl<CT, W> WorkflowHandle<CT, W> {
317    /// Return the run id of the Workflow Execution pointed at by this handle, if there is one.
318    pub fn run_id(&self) -> Option<&str> {
319        self.info.run_id.as_deref()
320    }
321}
322
323/// Holds needed information to refer to a specific workflow run, or workflow execution chain
324#[derive(Debug, Clone)]
325pub struct WorkflowExecutionInfo {
326    /// Namespace the workflow lives in.
327    pub namespace: String,
328    /// The workflow's id.
329    pub workflow_id: String,
330    /// If set, target this specific run of the workflow.
331    pub run_id: Option<String>,
332    /// Run ID used for cancellation and termination to ensure they happen on a workflow starting
333    /// with this run ID. This can be set when getting a workflow handle. When starting a workflow,
334    /// this is set as the resulting run ID if no start signal was provided.
335    pub first_execution_run_id: Option<String>,
336}
337
338impl WorkflowExecutionInfo {
339    /// Bind the workflow info to a specific client, turning it into a workflow handle
340    pub fn bind_untyped<CT>(self, client: CT) -> UntypedWorkflowHandle<CT>
341    where
342        CT: WorkflowService + Clone,
343    {
344        UntypedWorkflowHandle::new(client, self)
345    }
346}
347
348/// A workflow handle to a workflow with unknown types. Uses single argument raw payloads for input
349/// and output.
350pub type UntypedWorkflowHandle<CT> = WorkflowHandle<CT, UntypedWorkflow>;
351
352/// Marker type for sending untyped signals. Stores the signal name for runtime lookup.
353///
354/// Use with `handle.signal(UntypedSignal::new("signal_name"), raw_payload)`.
355pub struct UntypedSignal<W> {
356    name: String,
357    _wf: PhantomData<W>,
358}
359
360impl<W> UntypedSignal<W> {
361    /// Create a new `UntypedSignal` with the given signal name.
362    pub fn new(name: impl Into<String>) -> Self {
363        Self {
364            name: name.into(),
365            _wf: PhantomData,
366        }
367    }
368}
369
370impl<W: WorkflowDefinition> SignalDefinition for UntypedSignal<W> {
371    type Workflow = W;
372    type Input = RawValue;
373
374    fn name(&self) -> &str {
375        &self.name
376    }
377}
378
379/// Marker type for sending untyped queries. Stores the query name for runtime lookup.
380///
381/// Use with `handle.query(UntypedQuery::new("query_name"), raw_payload)`.
382pub struct UntypedQuery<W> {
383    name: String,
384    _wf: PhantomData<W>,
385}
386
387impl<W> UntypedQuery<W> {
388    /// Create a new `UntypedQuery` with the given query name.
389    pub fn new(name: impl Into<String>) -> Self {
390        Self {
391            name: name.into(),
392            _wf: PhantomData,
393        }
394    }
395}
396
397impl<W: WorkflowDefinition> QueryDefinition for UntypedQuery<W> {
398    type Workflow = W;
399    type Input = RawValue;
400    type Output = RawValue;
401
402    fn name(&self) -> &str {
403        &self.name
404    }
405}
406
407/// Marker type for sending untyped updates. Stores the update name for runtime lookup.
408///
409/// Use with `handle.update(UntypedUpdate::new("update_name"), raw_payload)`.
410pub struct UntypedUpdate<W> {
411    name: String,
412    _wf: PhantomData<W>,
413}
414
415impl<W> UntypedUpdate<W> {
416    /// Create a new `UntypedUpdate` with the given update name.
417    pub fn new(name: impl Into<String>) -> Self {
418        Self {
419            name: name.into(),
420            _wf: PhantomData,
421        }
422    }
423}
424
425impl<W: WorkflowDefinition> UpdateDefinition for UntypedUpdate<W> {
426    type Workflow = W;
427    type Input = RawValue;
428    type Output = RawValue;
429
430    fn name(&self) -> &str {
431        &self.name
432    }
433}
434
435impl<CT, W> WorkflowHandle<CT, W>
436where
437    CT: WorkflowService + Clone,
438    W: HasWorkflowDefinition,
439{
440    /// Create a workflow handle from a client and identifying information.
441    pub fn new(client: CT, info: WorkflowExecutionInfo) -> Self {
442        Self {
443            client,
444            info,
445            _wf_type: PhantomData::<W>,
446        }
447    }
448
449    /// Get the workflow execution info
450    pub fn info(&self) -> &WorkflowExecutionInfo {
451        &self.info
452    }
453
454    /// Get the client attached to this handle
455    pub fn client(&self) -> &CT {
456        &self.client
457    }
458
459    /// Await the result of the workflow execution
460    pub async fn get_result(
461        &self,
462        opts: WorkflowGetResultOptions,
463    ) -> Result<W::Output, WorkflowGetResultError>
464    where
465        CT: WorkflowService + NamespacedClient + Clone,
466    {
467        let raw = self.get_result_raw(opts).await?;
468        match raw {
469            WorkflowExecutionResult::Succeeded(v) => Ok(v),
470            WorkflowExecutionResult::Failed(f) => Err(WorkflowGetResultError::Failed(Box::new(f))),
471            WorkflowExecutionResult::Cancelled { details } => {
472                Err(WorkflowGetResultError::Cancelled { details })
473            }
474            WorkflowExecutionResult::Terminated { details } => {
475                Err(WorkflowGetResultError::Terminated { details })
476            }
477            WorkflowExecutionResult::TimedOut => Err(WorkflowGetResultError::TimedOut),
478            WorkflowExecutionResult::ContinuedAsNew => Err(WorkflowGetResultError::ContinuedAsNew),
479        }
480    }
481
482    /// Await the result of the workflow execution, returning the full
483    /// [`WorkflowExecutionResult`] enum for callers that need to inspect non-success outcomes
484    /// directly.
485    async fn get_result_raw(
486        &self,
487        opts: WorkflowGetResultOptions,
488    ) -> Result<WorkflowExecutionResult<W::Output>, WorkflowInteractionError>
489    where
490        CT: WorkflowService + NamespacedClient + Clone,
491    {
492        let mut run_id = self.info.run_id.clone().unwrap_or_default();
493        let fetch_opts = WorkflowFetchHistoryOptions::builder()
494            .skip_archival(true)
495            .wait_new_event(true)
496            .event_filter_type(HistoryEventFilterType::CloseEvent)
497            .build();
498
499        loop {
500            let history = self.fetch_history_for_run(&run_id, &fetch_opts).await?;
501            let mut events = history.into_events();
502
503            if events.is_empty() {
504                continue;
505            }
506
507            let event_attrs = events.pop().and_then(|ev| ev.attributes);
508
509            macro_rules! follow {
510                ($attrs:ident) => {
511                    if opts.follow_runs && $attrs.new_execution_run_id != "" {
512                        run_id = $attrs.new_execution_run_id;
513                        continue;
514                    }
515                };
516            }
517
518            let dc = self.client.data_converter();
519
520            break match event_attrs {
521                Some(Attributes::WorkflowExecutionCompletedEventAttributes(attrs)) => {
522                    follow!(attrs);
523                    let payload = attrs
524                        .result
525                        .and_then(|p| p.payloads.into_iter().next())
526                        .unwrap_or_default();
527                    let result: W::Output = dc
528                        .from_payload(&SerializationContextData::Workflow, payload)
529                        .await?;
530                    Ok(WorkflowExecutionResult::Succeeded(result))
531                }
532                Some(Attributes::WorkflowExecutionFailedEventAttributes(attrs)) => {
533                    follow!(attrs);
534                    Ok(WorkflowExecutionResult::Failed(
535                        attrs.failure.unwrap_or_default(),
536                    ))
537                }
538                Some(Attributes::WorkflowExecutionCanceledEventAttributes(attrs)) => {
539                    Ok(WorkflowExecutionResult::Cancelled {
540                        details: Vec::from_payloads(attrs.details),
541                    })
542                }
543                Some(Attributes::WorkflowExecutionTimedOutEventAttributes(attrs)) => {
544                    follow!(attrs);
545                    Ok(WorkflowExecutionResult::TimedOut)
546                }
547                Some(Attributes::WorkflowExecutionTerminatedEventAttributes(attrs)) => {
548                    Ok(WorkflowExecutionResult::Terminated {
549                        details: Vec::from_payloads(attrs.details),
550                    })
551                }
552                Some(Attributes::WorkflowExecutionContinuedAsNewEventAttributes(attrs)) => {
553                    if opts.follow_runs {
554                        if !attrs.new_execution_run_id.is_empty() {
555                            run_id = attrs.new_execution_run_id;
556                            continue;
557                        } else {
558                            return Err(WorkflowInteractionError::Other(
559                                "New execution run id was empty in continue as new event!".into(),
560                            ));
561                        }
562                    } else {
563                        Ok(WorkflowExecutionResult::ContinuedAsNew)
564                    }
565                }
566                o => Err(WorkflowInteractionError::Other(
567                    format!(
568                        "Server returned an event that didn't match the CloseEvent filter. \
569                         This is either a server bug or a new event the SDK does not understand. \
570                         Event details: {o:?}"
571                    )
572                    .into(),
573                )),
574            };
575        }
576    }
577
578    /// Send a signal to the workflow
579    pub async fn signal<S>(
580        &self,
581        signal: S,
582        input: S::Input,
583        opts: WorkflowSignalOptions,
584    ) -> Result<(), WorkflowInteractionError>
585    where
586        CT: WorkflowService + NamespacedClient + Clone,
587        S: SignalDefinition<Workflow = W::Run>,
588        S::Input: Send,
589    {
590        let payloads = self
591            .client
592            .data_converter()
593            .to_payloads(&SerializationContextData::Workflow, &input)
594            .await?;
595        WorkflowService::signal_workflow_execution(
596            &mut self.client.clone(),
597            SignalWorkflowExecutionRequest {
598                namespace: self.client.namespace(),
599                workflow_execution: Some(ProtoWorkflowExecution {
600                    workflow_id: self.info.workflow_id.clone(),
601                    run_id: self.info.run_id.clone().unwrap_or_default(),
602                }),
603                signal_name: signal.name().to_string(),
604                input: Some(Payloads { payloads }),
605                identity: self.client.identity(),
606                request_id: opts
607                    .request_id
608                    .unwrap_or_else(|| Uuid::new_v4().to_string()),
609                header: opts.header,
610                ..Default::default()
611            }
612            .into_request(),
613        )
614        .await
615        .map_err(WorkflowInteractionError::from_status)?;
616        Ok(())
617    }
618
619    /// Query the workflow
620    pub async fn query<Q>(
621        &self,
622        query: Q,
623        input: Q::Input,
624        opts: WorkflowQueryOptions,
625    ) -> Result<Q::Output, WorkflowQueryError>
626    where
627        CT: WorkflowService + NamespacedClient + Clone,
628        Q: QueryDefinition<Workflow = W::Run>,
629        Q::Input: Send,
630    {
631        let dc = self.client.data_converter();
632        let payloads = dc
633            .to_payloads(&SerializationContextData::Workflow, &input)
634            .await?;
635        let response = self
636            .client
637            .clone()
638            .query_workflow(
639                QueryWorkflowRequest {
640                    namespace: self.client.namespace(),
641                    execution: Some(ProtoWorkflowExecution {
642                        workflow_id: self.info.workflow_id.clone(),
643                        run_id: self.info.run_id.clone().unwrap_or_default(),
644                    }),
645                    query: Some(WorkflowQuery {
646                        query_type: query.name().to_string(),
647                        query_args: Some(Payloads { payloads }),
648                        header: opts.header,
649                    }),
650                    // Default to None (1) which means don't reject
651                    query_reject_condition: opts.reject_condition.map(|c| c as i32).unwrap_or(1),
652                }
653                .into_request(),
654            )
655            .await
656            .map_err(WorkflowQueryError::from_status)?
657            .into_inner();
658
659        if let Some(rejected) = response.query_rejected {
660            return Err(WorkflowQueryError::Rejected(rejected));
661        }
662
663        let result_payloads = response
664            .query_result
665            .map(|p| p.payloads)
666            .unwrap_or_default();
667
668        dc.from_payloads(&SerializationContextData::Workflow, result_payloads)
669            .await
670            .map_err(WorkflowQueryError::from)
671    }
672
673    /// Send an update to the workflow and wait for it to complete, returning the result.
674    pub async fn execute_update<U>(
675        &self,
676        update: U,
677        input: U::Input,
678        options: WorkflowExecuteUpdateOptions,
679    ) -> Result<U::Output, WorkflowUpdateError>
680    where
681        CT: WorkflowService + NamespacedClient + Clone,
682        U: UpdateDefinition<Workflow = W::Run>,
683        U::Input: Send,
684        U::Output: 'static,
685    {
686        let handle = self
687            .start_update(
688                update,
689                input,
690                WorkflowStartUpdateOptions::builder()
691                    .maybe_update_id(options.update_id)
692                    .maybe_header(options.header)
693                    .wait_for_stage(WorkflowUpdateWaitStage::Completed)
694                    .build(),
695            )
696            .await?;
697        handle.get_result().await
698    }
699
700    /// Start an update and return a handle without waiting for completion.
701    /// Use `execute_update()` if you want to wait for the result immediately.
702    pub async fn start_update<U>(
703        &self,
704        update: U,
705        input: U::Input,
706        options: WorkflowStartUpdateOptions,
707    ) -> Result<WorkflowUpdateHandle<CT, U::Output>, WorkflowUpdateError>
708    where
709        CT: WorkflowService + NamespacedClient + Clone,
710        U: UpdateDefinition<Workflow = W::Run>,
711        U::Input: Send,
712    {
713        let dc = self.client.data_converter();
714        let payloads = dc
715            .to_payloads(&SerializationContextData::Workflow, &input)
716            .await?;
717
718        let lifecycle_stage = match options.wait_for_stage {
719            WorkflowUpdateWaitStage::Admitted => UpdateWorkflowExecutionLifecycleStage::Admitted,
720            WorkflowUpdateWaitStage::Accepted => UpdateWorkflowExecutionLifecycleStage::Accepted,
721            WorkflowUpdateWaitStage::Completed => UpdateWorkflowExecutionLifecycleStage::Completed,
722        };
723
724        let update_id = options
725            .update_id
726            .unwrap_or_else(|| Uuid::new_v4().to_string());
727
728        let response = WorkflowService::update_workflow_execution(
729            &mut self.client.clone(),
730            UpdateWorkflowExecutionRequest {
731                namespace: self.client.namespace(),
732                workflow_execution: Some(ProtoWorkflowExecution {
733                    workflow_id: self.info().workflow_id.clone(),
734                    run_id: self.info().run_id.clone().unwrap_or_default(),
735                }),
736                wait_policy: Some(WaitPolicy {
737                    lifecycle_stage: lifecycle_stage.into(),
738                }),
739                request: Some(update::v1::Request {
740                    meta: Some(update::v1::Meta {
741                        update_id: update_id.clone(),
742                        identity: self.client.identity(),
743                    }),
744                    input: Some(update::v1::Input {
745                        header: options.header,
746                        name: update.name().to_string(),
747                        args: Some(Payloads { payloads }),
748                    }),
749                }),
750                ..Default::default()
751            }
752            .into_request(),
753        )
754        .await
755        .map_err(WorkflowUpdateError::from_status)?
756        .into_inner();
757
758        // Extract run_id from response if available
759        let run_id = response
760            .update_ref
761            .as_ref()
762            .and_then(|r| r.workflow_execution.as_ref())
763            .map(|e| e.run_id.clone())
764            .filter(|s| !s.is_empty())
765            .or_else(|| self.info().run_id.clone());
766
767        Ok(WorkflowUpdateHandle {
768            client: self.client.clone(),
769            update_id,
770            workflow_id: self.info().workflow_id.clone(),
771            run_id,
772            known_outcome: response.outcome,
773            _output: PhantomData,
774        })
775    }
776
777    /// Request cancellation of this workflow.
778    pub async fn cancel(&self, opts: WorkflowCancelOptions) -> Result<(), WorkflowInteractionError>
779    where
780        CT: NamespacedClient,
781    {
782        WorkflowService::request_cancel_workflow_execution(
783            &mut self.client.clone(),
784            RequestCancelWorkflowExecutionRequest {
785                namespace: self.client.namespace(),
786                workflow_execution: Some(ProtoWorkflowExecution {
787                    workflow_id: self.info.workflow_id.clone(),
788                    run_id: self.info.run_id.clone().unwrap_or_default(),
789                }),
790                identity: self.client.identity(),
791                request_id: opts
792                    .request_id
793                    .unwrap_or_else(|| Uuid::new_v4().to_string()),
794                first_execution_run_id: self
795                    .info
796                    .first_execution_run_id
797                    .clone()
798                    .unwrap_or_default(),
799                reason: opts.reason,
800                links: vec![],
801            }
802            .into_request(),
803        )
804        .await
805        .map_err(WorkflowInteractionError::from_status)?;
806        Ok(())
807    }
808
809    /// Terminate this workflow.
810    pub async fn terminate(
811        &self,
812        opts: WorkflowTerminateOptions,
813    ) -> Result<(), WorkflowInteractionError>
814    where
815        CT: NamespacedClient,
816    {
817        WorkflowService::terminate_workflow_execution(
818            &mut self.client.clone(),
819            TerminateWorkflowExecutionRequest {
820                namespace: self.client.namespace(),
821                workflow_execution: Some(ProtoWorkflowExecution {
822                    workflow_id: self.info.workflow_id.clone(),
823                    run_id: self.info.run_id.clone().unwrap_or_default(),
824                }),
825                reason: opts.reason,
826                details: opts.details,
827                identity: self.client.identity(),
828                first_execution_run_id: self
829                    .info
830                    .first_execution_run_id
831                    .clone()
832                    .unwrap_or_default(),
833                links: vec![],
834            }
835            .into_request(),
836        )
837        .await
838        .map_err(WorkflowInteractionError::from_status)?;
839        Ok(())
840    }
841
842    /// Get workflow execution description/metadata.
843    pub async fn describe(
844        &self,
845        _opts: WorkflowDescribeOptions,
846    ) -> Result<WorkflowExecutionDescription, WorkflowInteractionError>
847    where
848        CT: NamespacedClient,
849    {
850        let response = WorkflowService::describe_workflow_execution(
851            &mut self.client.clone(),
852            DescribeWorkflowExecutionRequest {
853                namespace: self.client.namespace(),
854                execution: Some(ProtoWorkflowExecution {
855                    workflow_id: self.info.workflow_id.clone(),
856                    run_id: self.info.run_id.clone().unwrap_or_default(),
857                }),
858            }
859            .into_request(),
860        )
861        .await
862        .map_err(WorkflowInteractionError::from_status)?
863        .into_inner();
864        WorkflowExecutionDescription::new(response, self.client.data_converter())
865            .await
866            .map_err(WorkflowInteractionError::from)
867    }
868    /// Fetch workflow execution history.
869    pub async fn fetch_history(
870        &self,
871        opts: WorkflowFetchHistoryOptions,
872    ) -> Result<WorkflowHistory, WorkflowInteractionError>
873    where
874        CT: NamespacedClient,
875    {
876        let run_id = self.info.run_id.clone().unwrap_or_default();
877        self.fetch_history_for_run(&run_id, &opts).await
878    }
879
880    /// Fetch history for a specific run_id, handling pagination.
881    async fn fetch_history_for_run(
882        &self,
883        run_id: &str,
884        opts: &WorkflowFetchHistoryOptions,
885    ) -> Result<WorkflowHistory, WorkflowInteractionError>
886    where
887        CT: NamespacedClient,
888    {
889        let mut all_events = Vec::new();
890        let mut next_page_token = vec![];
891
892        loop {
893            let response = WorkflowService::get_workflow_execution_history(
894                &mut self.client.clone(),
895                GetWorkflowExecutionHistoryRequest {
896                    namespace: self.client.namespace(),
897                    execution: Some(ProtoWorkflowExecution {
898                        workflow_id: self.info.workflow_id.clone(),
899                        run_id: run_id.to_string(),
900                    }),
901                    next_page_token: next_page_token.clone(),
902                    skip_archival: opts.skip_archival,
903                    wait_new_event: opts.wait_new_event,
904                    history_event_filter_type: opts.event_filter_type as i32,
905                    ..Default::default()
906                }
907                .into_request(),
908            )
909            .await
910            .map_err(WorkflowInteractionError::from_status)?
911            .into_inner();
912
913            if let Some(history) = response.history {
914                all_events.extend(history.events);
915            }
916
917            if response.next_page_token.is_empty() {
918                break;
919            }
920            next_page_token = response.next_page_token;
921        }
922
923        Ok(WorkflowHistory::new(all_events))
924    }
925}
926
927/// Handle to a workflow update that has been started but may not be complete.
928///
929/// Use `get_result()` to wait for the update to complete and retrieve its result.
930pub struct WorkflowUpdateHandle<CT, T> {
931    client: CT,
932    update_id: String,
933    workflow_id: String,
934    run_id: Option<String>,
935    /// If the update was started with `Completed` wait stage, the outcome is already available.
936    known_outcome: Option<update::v1::Outcome>,
937    _output: PhantomData<T>,
938}
939
940impl<CT, T> WorkflowUpdateHandle<CT, T> {
941    /// Get the update ID.
942    pub fn id(&self) -> &str {
943        &self.update_id
944    }
945
946    /// Get the workflow ID.
947    pub fn workflow_id(&self) -> &str {
948        &self.workflow_id
949    }
950
951    /// Get the workflow run ID, if available.
952    pub fn workflow_run_id(&self) -> Option<&str> {
953        self.run_id.as_deref()
954    }
955}
956
957impl<CT, T: 'static> WorkflowUpdateHandle<CT, T>
958where
959    CT: WorkflowService + NamespacedClient + Clone,
960{
961    /// Wait for the update to complete and return the result.
962    pub async fn get_result(&self) -> Result<T, WorkflowUpdateError>
963    where
964        T: temporalio_common::data_converters::TemporalDeserializable,
965    {
966        let outcome = if let Some(known) = &self.known_outcome {
967            known.clone()
968        } else {
969            // The server's internal long-poll timeout (~60s) may expire before the update
970            // completes, returning a response with outcome: None. Keep polling until we
971            // get an actual outcome.
972            loop {
973                let response = WorkflowService::poll_workflow_execution_update(
974                    &mut self.client.clone(),
975                    PollWorkflowExecutionUpdateRequest {
976                        namespace: self.client.namespace(),
977                        update_ref: Some(update::v1::UpdateRef {
978                            workflow_execution: Some(ProtoWorkflowExecution {
979                                workflow_id: self.workflow_id.clone(),
980                                run_id: self.run_id.clone().unwrap_or_default(),
981                            }),
982                            update_id: self.update_id.clone(),
983                        }),
984                        identity: self.client.identity(),
985                        wait_policy: Some(WaitPolicy {
986                            lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed
987                                .into(),
988                        }),
989                    }
990                    .into_request(),
991                )
992                .await
993                .map_err(WorkflowUpdateError::from_status)?
994                .into_inner();
995
996                if let Some(outcome) = response.outcome {
997                    break outcome;
998                }
999            }
1000        };
1001
1002        match outcome.value {
1003            Some(update::v1::outcome::Value::Success(success)) => self
1004                .client
1005                .data_converter()
1006                .from_payloads(&SerializationContextData::Workflow, success.payloads)
1007                .await
1008                .map_err(WorkflowUpdateError::from),
1009            Some(update::v1::outcome::Value::Failure(failure)) => {
1010                Err(WorkflowUpdateError::Failed(Box::new(failure)))
1011            }
1012            None => Err(WorkflowUpdateError::Other(
1013                "Update returned no outcome value".into(),
1014            )),
1015        }
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022    use std::collections::HashMap;
1023    use temporalio_common::protos::temporal::api::{
1024        common::v1::{Memo, SearchAttributes},
1025        enums::v1::WorkflowExecutionStatus,
1026        sdk::v1::UserMetadata,
1027        workflow::v1::WorkflowExecutionConfig,
1028    };
1029
1030    #[tokio::test]
1031    async fn workflow_description_accessors_expose_decoded_fields() {
1032        let converter = DataConverter::default();
1033        let memo_payload = converter
1034            .to_payload(&SerializationContextData::Workflow, &"memo-value")
1035            .await
1036            .unwrap();
1037        let search_attr_payload = converter
1038            .to_payload(&SerializationContextData::Workflow, &"search-value")
1039            .await
1040            .unwrap();
1041        let summary_payload = converter
1042            .to_payload(&SerializationContextData::Workflow, &"workflow summary")
1043            .await
1044            .unwrap();
1045        let details_payload = converter
1046            .to_payload(&SerializationContextData::Workflow, &"workflow details")
1047            .await
1048            .unwrap();
1049        let description = WorkflowExecutionDescription::new(
1050            DescribeWorkflowExecutionResponse {
1051                workflow_execution_info: Some(workflow::WorkflowExecutionInfo {
1052                    execution: Some(ProtoWorkflowExecution {
1053                        workflow_id: "wf-id".to_string(),
1054                        run_id: "run-id".to_string(),
1055                    }),
1056                    r#type: Some(
1057                        temporalio_common::protos::temporal::api::common::v1::WorkflowType {
1058                            name: "wf-type".to_string(),
1059                        },
1060                    ),
1061                    status: WorkflowExecutionStatus::Completed as i32,
1062                    task_queue: "task-queue".to_string(),
1063                    history_length: 42,
1064                    memo: Some(Memo {
1065                        fields: HashMap::from([("memo-key".to_string(), memo_payload.clone())]),
1066                    }),
1067                    parent_execution: Some(ProtoWorkflowExecution {
1068                        workflow_id: "parent-id".to_string(),
1069                        run_id: "parent-run-id".to_string(),
1070                    }),
1071                    search_attributes: Some(SearchAttributes {
1072                        indexed_fields: HashMap::from([(
1073                            "CustomKeywordField".to_string(),
1074                            search_attr_payload.clone(),
1075                        )]),
1076                    }),
1077                    ..Default::default()
1078                }),
1079                execution_config: Some(WorkflowExecutionConfig {
1080                    user_metadata: Some(UserMetadata {
1081                        summary: Some(summary_payload),
1082                        details: Some(details_payload),
1083                    }),
1084                    ..Default::default()
1085                }),
1086                ..Default::default()
1087            },
1088            &converter,
1089        )
1090        .await
1091        .unwrap();
1092
1093        assert_eq!(description.id(), "wf-id");
1094        assert_eq!(description.run_id(), "run-id");
1095        assert_eq!(description.workflow_type(), "wf-type");
1096        assert_eq!(description.status(), WorkflowExecutionStatus::Completed);
1097        assert_eq!(description.task_queue(), "task-queue");
1098        assert_eq!(description.history_length(), 42);
1099        assert_eq!(description.parent_id(), Some("parent-id"));
1100        assert_eq!(description.parent_run_id(), Some("parent-run-id"));
1101        assert_eq!(description.memo().unwrap().fields["memo-key"], memo_payload);
1102        assert_eq!(
1103            description.search_attributes().unwrap().indexed_fields["CustomKeywordField"],
1104            search_attr_payload
1105        );
1106        assert_eq!(description.static_summary(), Some("workflow summary"));
1107        assert_eq!(description.static_details(), Some("workflow details"));
1108    }
1109
1110    #[tokio::test]
1111    async fn workflow_description_rejects_negative_history_length() {
1112        let err = WorkflowExecutionDescription::new(
1113            DescribeWorkflowExecutionResponse {
1114                workflow_execution_info: Some(workflow::WorkflowExecutionInfo {
1115                    history_length: -1,
1116                    ..Default::default()
1117                }),
1118                ..Default::default()
1119            },
1120            &DataConverter::default(),
1121        )
1122        .await
1123        .unwrap_err();
1124
1125        assert_eq!(
1126            err.to_string(),
1127            "Encoding error: workflow history_length must be non-negative, got -1"
1128        );
1129    }
1130}