Skip to main content

temporalio_client/
workflow_handle.rs

1use crate::{
2    NamespacedClient, WorkflowCancelOptions, WorkflowDescribeOptions, WorkflowExecuteUpdateOptions,
3    WorkflowFetchHistoryOptions, WorkflowGetResultOptions, WorkflowQueryOptions,
4    WorkflowSignalOptions, WorkflowStartUpdateOptions, WorkflowTerminateOptions,
5    WorkflowUpdateWaitStage,
6    errors::{
7        WorkflowGetResultError, WorkflowInteractionError, WorkflowQueryError, WorkflowUpdateError,
8    },
9    grpc::WorkflowService,
10};
11use std::{fmt::Debug, marker::PhantomData};
12use temporalio_common::{
13    QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
14    data_converters::{RawValue, SerializationContextData},
15    protos::{
16        coresdk::FromPayloadsExt,
17        temporal::api::{
18            common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution},
19            enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage},
20            failure::v1::Failure,
21            history::{
22                self,
23                v1::{HistoryEvent, history_event::Attributes},
24            },
25            query::v1::WorkflowQuery,
26            update::{self, v1::WaitPolicy},
27            workflowservice::v1::{
28                DescribeWorkflowExecutionRequest, DescribeWorkflowExecutionResponse,
29                GetWorkflowExecutionHistoryRequest, PollWorkflowExecutionUpdateRequest,
30                QueryWorkflowRequest, RequestCancelWorkflowExecutionRequest,
31                SignalWorkflowExecutionRequest, TerminateWorkflowExecutionRequest,
32                UpdateWorkflowExecutionRequest,
33            },
34        },
35    },
36};
37use tonic::IntoRequest;
38use uuid::Uuid;
39
40/// Enumerates terminal states for a particular workflow execution
41#[derive(Debug)]
42#[allow(clippy::large_enum_variant)]
43pub enum WorkflowExecutionResult<T> {
44    /// The workflow finished successfully
45    Succeeded(T),
46    /// The workflow finished in failure
47    Failed(Failure),
48    /// The workflow was cancelled
49    Cancelled {
50        /// Details provided at cancellation time
51        details: Vec<Payload>,
52    },
53    /// The workflow was terminated
54    Terminated {
55        /// Details provided at termination time
56        details: Vec<Payload>,
57    },
58    /// The workflow timed out
59    TimedOut,
60    /// The workflow continued as new
61    ContinuedAsNew,
62}
63
64/// Description of a workflow execution returned by `WorkflowHandle::describe`.
65#[derive(Debug, Clone)]
66pub struct WorkflowExecutionDescription {
67    /// The raw proto response from the server.
68    pub raw_description: DescribeWorkflowExecutionResponse,
69}
70
71impl WorkflowExecutionDescription {
72    fn new(raw_description: DescribeWorkflowExecutionResponse) -> Self {
73        Self { raw_description }
74    }
75}
76
77// TODO [rust-sdk-branch]: Could implment stream a-la ListWorkflowsStream
78/// Workflow execution history returned by `WorkflowHandle::fetch_history`.
79#[derive(Debug, Clone)]
80pub struct WorkflowHistory {
81    events: Vec<HistoryEvent>,
82}
83impl From<WorkflowHistory> for history::v1::History {
84    fn from(h: WorkflowHistory) -> Self {
85        Self { events: h.events }
86    }
87}
88
89impl WorkflowHistory {
90    fn new(events: Vec<HistoryEvent>) -> Self {
91        Self { events }
92    }
93
94    /// The history events.
95    pub fn events(&self) -> &[HistoryEvent] {
96        &self.events
97    }
98
99    /// Consume the history and return the events.
100    pub fn into_events(self) -> Vec<HistoryEvent> {
101        self.events
102    }
103}
104
105/// A workflow handle which can refer to a specific workflow run, or a chain of workflow runs with
106/// the same workflow id.
107#[derive(Clone)]
108pub struct WorkflowHandle<ClientT, W> {
109    client: ClientT,
110    info: WorkflowExecutionInfo,
111
112    _wf_type: PhantomData<W>,
113}
114
115impl<CT, W> WorkflowHandle<CT, W> {
116    /// Return the run id of the Workflow Execution pointed at by this handle, if there is one.
117    pub fn run_id(&self) -> Option<&str> {
118        self.info.run_id.as_deref()
119    }
120}
121
122/// Holds needed information to refer to a specific workflow run, or workflow execution chain
123#[derive(Debug, Clone)]
124pub struct WorkflowExecutionInfo {
125    /// Namespace the workflow lives in.
126    pub namespace: String,
127    /// The workflow's id.
128    pub workflow_id: String,
129    /// If set, target this specific run of the workflow.
130    pub run_id: Option<String>,
131    /// Run ID used for cancellation and termination to ensure they happen on a workflow starting
132    /// with this run ID. This can be set when getting a workflow handle. When starting a workflow,
133    /// this is set as the resulting run ID if no start signal was provided.
134    pub first_execution_run_id: Option<String>,
135}
136
137impl WorkflowExecutionInfo {
138    /// Bind the workflow info to a specific client, turning it into a workflow handle
139    pub fn bind_untyped<CT>(self, client: CT) -> UntypedWorkflowHandle<CT>
140    where
141        CT: WorkflowService + Clone,
142    {
143        UntypedWorkflowHandle::new(client, self)
144    }
145}
146
147/// A workflow handle to a workflow with unknown types. Uses single argument raw payloads for input
148/// and output.
149pub type UntypedWorkflowHandle<CT> = WorkflowHandle<CT, UntypedWorkflow>;
150
151/// Marker type for untyped workflow handles. Stores the workflow type name.
152pub struct UntypedWorkflow {
153    name: String,
154}
155impl UntypedWorkflow {
156    /// Create a new `UntypedWorkflow` with the given workflow type name.
157    pub fn new(name: impl Into<String>) -> Self {
158        Self { name: name.into() }
159    }
160}
161impl WorkflowDefinition for UntypedWorkflow {
162    type Input = RawValue;
163    type Output = RawValue;
164    fn name(&self) -> &str {
165        &self.name
166    }
167}
168
169/// Marker type for sending untyped signals. Stores the signal name for runtime lookup.
170///
171/// Use with `handle.signal(UntypedSignal::new("signal_name"), raw_payload)`.
172pub struct UntypedSignal<W> {
173    name: String,
174    _wf: PhantomData<W>,
175}
176
177impl<W> UntypedSignal<W> {
178    /// Create a new `UntypedSignal` with the given signal name.
179    pub fn new(name: impl Into<String>) -> Self {
180        Self {
181            name: name.into(),
182            _wf: PhantomData,
183        }
184    }
185}
186
187impl<W: WorkflowDefinition> SignalDefinition for UntypedSignal<W> {
188    type Workflow = W;
189    type Input = RawValue;
190
191    fn name(&self) -> &str {
192        &self.name
193    }
194}
195
196/// Marker type for sending untyped queries. Stores the query name for runtime lookup.
197///
198/// Use with `handle.query(UntypedQuery::new("query_name"), raw_payload)`.
199pub struct UntypedQuery<W> {
200    name: String,
201    _wf: PhantomData<W>,
202}
203
204impl<W> UntypedQuery<W> {
205    /// Create a new `UntypedQuery` with the given query name.
206    pub fn new(name: impl Into<String>) -> Self {
207        Self {
208            name: name.into(),
209            _wf: PhantomData,
210        }
211    }
212}
213
214impl<W: WorkflowDefinition> QueryDefinition for UntypedQuery<W> {
215    type Workflow = W;
216    type Input = RawValue;
217    type Output = RawValue;
218
219    fn name(&self) -> &str {
220        &self.name
221    }
222}
223
224/// Marker type for sending untyped updates. Stores the update name for runtime lookup.
225///
226/// Use with `handle.update(UntypedUpdate::new("update_name"), raw_payload)`.
227pub struct UntypedUpdate<W> {
228    name: String,
229    _wf: PhantomData<W>,
230}
231
232impl<W> UntypedUpdate<W> {
233    /// Create a new `UntypedUpdate` with the given update name.
234    pub fn new(name: impl Into<String>) -> Self {
235        Self {
236            name: name.into(),
237            _wf: PhantomData,
238        }
239    }
240}
241
242impl<W: WorkflowDefinition> UpdateDefinition for UntypedUpdate<W> {
243    type Workflow = W;
244    type Input = RawValue;
245    type Output = RawValue;
246
247    fn name(&self) -> &str {
248        &self.name
249    }
250}
251
252impl<CT, W> WorkflowHandle<CT, W>
253where
254    CT: WorkflowService + Clone,
255    W: WorkflowDefinition,
256{
257    /// Create a workflow handle from a client and identifying information.
258    pub fn new(client: CT, info: WorkflowExecutionInfo) -> Self {
259        Self {
260            client,
261            info,
262            _wf_type: PhantomData::<W>,
263        }
264    }
265
266    /// Get the workflow execution info
267    pub fn info(&self) -> &WorkflowExecutionInfo {
268        &self.info
269    }
270
271    /// Get the client attached to this handle
272    pub fn client(&self) -> &CT {
273        &self.client
274    }
275
276    /// Await the result of the workflow execution
277    pub async fn get_result(
278        &self,
279        opts: WorkflowGetResultOptions,
280    ) -> Result<W::Output, WorkflowGetResultError>
281    where
282        CT: WorkflowService + NamespacedClient + Clone,
283    {
284        let raw = self.get_result_raw(opts).await?;
285        match raw {
286            WorkflowExecutionResult::Succeeded(v) => Ok(v),
287            WorkflowExecutionResult::Failed(f) => Err(WorkflowGetResultError::Failed(Box::new(f))),
288            WorkflowExecutionResult::Cancelled { details } => {
289                Err(WorkflowGetResultError::Cancelled { details })
290            }
291            WorkflowExecutionResult::Terminated { details } => {
292                Err(WorkflowGetResultError::Terminated { details })
293            }
294            WorkflowExecutionResult::TimedOut => Err(WorkflowGetResultError::TimedOut),
295            WorkflowExecutionResult::ContinuedAsNew => Err(WorkflowGetResultError::ContinuedAsNew),
296        }
297    }
298
299    /// Await the result of the workflow execution, returning the full
300    /// [`WorkflowExecutionResult`] enum for callers that need to inspect non-success outcomes
301    /// directly.
302    async fn get_result_raw(
303        &self,
304        opts: WorkflowGetResultOptions,
305    ) -> Result<WorkflowExecutionResult<W::Output>, WorkflowInteractionError>
306    where
307        CT: WorkflowService + NamespacedClient + Clone,
308    {
309        let mut run_id = self.info.run_id.clone().unwrap_or_default();
310        let fetch_opts = WorkflowFetchHistoryOptions::builder()
311            .skip_archival(true)
312            .wait_new_event(true)
313            .event_filter_type(HistoryEventFilterType::CloseEvent)
314            .build();
315
316        loop {
317            let history = self.fetch_history_for_run(&run_id, &fetch_opts).await?;
318            let mut events = history.into_events();
319
320            if events.is_empty() {
321                continue;
322            }
323
324            let event_attrs = events.pop().and_then(|ev| ev.attributes);
325
326            macro_rules! follow {
327                ($attrs:ident) => {
328                    if opts.follow_runs && $attrs.new_execution_run_id != "" {
329                        run_id = $attrs.new_execution_run_id;
330                        continue;
331                    }
332                };
333            }
334
335            let dc = self.client.data_converter();
336
337            break match event_attrs {
338                Some(Attributes::WorkflowExecutionCompletedEventAttributes(attrs)) => {
339                    follow!(attrs);
340                    let payload = attrs
341                        .result
342                        .and_then(|p| p.payloads.into_iter().next())
343                        .unwrap_or_default();
344                    let result: W::Output = dc
345                        .from_payload(&SerializationContextData::Workflow, payload)
346                        .await?;
347                    Ok(WorkflowExecutionResult::Succeeded(result))
348                }
349                Some(Attributes::WorkflowExecutionFailedEventAttributes(attrs)) => {
350                    follow!(attrs);
351                    Ok(WorkflowExecutionResult::Failed(
352                        attrs.failure.unwrap_or_default(),
353                    ))
354                }
355                Some(Attributes::WorkflowExecutionCanceledEventAttributes(attrs)) => {
356                    Ok(WorkflowExecutionResult::Cancelled {
357                        details: Vec::from_payloads(attrs.details),
358                    })
359                }
360                Some(Attributes::WorkflowExecutionTimedOutEventAttributes(attrs)) => {
361                    follow!(attrs);
362                    Ok(WorkflowExecutionResult::TimedOut)
363                }
364                Some(Attributes::WorkflowExecutionTerminatedEventAttributes(attrs)) => {
365                    Ok(WorkflowExecutionResult::Terminated {
366                        details: Vec::from_payloads(attrs.details),
367                    })
368                }
369                Some(Attributes::WorkflowExecutionContinuedAsNewEventAttributes(attrs)) => {
370                    if opts.follow_runs {
371                        if !attrs.new_execution_run_id.is_empty() {
372                            run_id = attrs.new_execution_run_id;
373                            continue;
374                        } else {
375                            return Err(WorkflowInteractionError::Other(
376                                "New execution run id was empty in continue as new event!".into(),
377                            ));
378                        }
379                    } else {
380                        Ok(WorkflowExecutionResult::ContinuedAsNew)
381                    }
382                }
383                o => Err(WorkflowInteractionError::Other(
384                    format!(
385                        "Server returned an event that didn't match the CloseEvent filter. \
386                         This is either a server bug or a new event the SDK does not understand. \
387                         Event details: {o:?}"
388                    )
389                    .into(),
390                )),
391            };
392        }
393    }
394
395    /// Send a signal to the workflow
396    pub async fn signal<S>(
397        &self,
398        signal: S,
399        input: S::Input,
400        opts: WorkflowSignalOptions,
401    ) -> Result<(), WorkflowInteractionError>
402    where
403        CT: WorkflowService + NamespacedClient + Clone,
404        S: SignalDefinition<Workflow = W>,
405        S::Input: Send,
406    {
407        let payloads = self
408            .client
409            .data_converter()
410            .to_payloads(&SerializationContextData::Workflow, &input)
411            .await?;
412        WorkflowService::signal_workflow_execution(
413            &mut self.client.clone(),
414            SignalWorkflowExecutionRequest {
415                namespace: self.client.namespace(),
416                workflow_execution: Some(ProtoWorkflowExecution {
417                    workflow_id: self.info.workflow_id.clone(),
418                    run_id: self.info.run_id.clone().unwrap_or_default(),
419                }),
420                signal_name: signal.name().to_string(),
421                input: Some(Payloads { payloads }),
422                identity: self.client.identity(),
423                request_id: opts
424                    .request_id
425                    .unwrap_or_else(|| Uuid::new_v4().to_string()),
426                header: opts.header,
427                ..Default::default()
428            }
429            .into_request(),
430        )
431        .await
432        .map_err(WorkflowInteractionError::from_status)?;
433        Ok(())
434    }
435
436    /// Query the workflow
437    pub async fn query<Q>(
438        &self,
439        query: Q,
440        input: Q::Input,
441        opts: WorkflowQueryOptions,
442    ) -> Result<Q::Output, WorkflowQueryError>
443    where
444        CT: WorkflowService + NamespacedClient + Clone,
445        Q: QueryDefinition<Workflow = W>,
446        Q::Input: Send,
447    {
448        let dc = self.client.data_converter();
449        let payloads = dc
450            .to_payloads(&SerializationContextData::Workflow, &input)
451            .await?;
452        let response = self
453            .client
454            .clone()
455            .query_workflow(
456                QueryWorkflowRequest {
457                    namespace: self.client.namespace(),
458                    execution: Some(ProtoWorkflowExecution {
459                        workflow_id: self.info.workflow_id.clone(),
460                        run_id: self.info.run_id.clone().unwrap_or_default(),
461                    }),
462                    query: Some(WorkflowQuery {
463                        query_type: query.name().to_string(),
464                        query_args: Some(Payloads { payloads }),
465                        header: opts.header,
466                    }),
467                    // Default to None (1) which means don't reject
468                    query_reject_condition: opts.reject_condition.map(|c| c as i32).unwrap_or(1),
469                }
470                .into_request(),
471            )
472            .await
473            .map_err(WorkflowQueryError::from_status)?
474            .into_inner();
475
476        if let Some(rejected) = response.query_rejected {
477            return Err(WorkflowQueryError::Rejected(rejected));
478        }
479
480        let result_payloads = response
481            .query_result
482            .map(|p| p.payloads)
483            .unwrap_or_default();
484
485        dc.from_payloads(&SerializationContextData::Workflow, result_payloads)
486            .await
487            .map_err(WorkflowQueryError::from)
488    }
489
490    /// Send an update to the workflow and wait for it to complete, returning the result.
491    pub async fn execute_update<U>(
492        &self,
493        update: U,
494        input: U::Input,
495        options: WorkflowExecuteUpdateOptions,
496    ) -> Result<U::Output, WorkflowUpdateError>
497    where
498        CT: WorkflowService + NamespacedClient + Clone,
499        U: UpdateDefinition<Workflow = W>,
500        U::Input: Send,
501        U::Output: 'static,
502    {
503        let handle = self
504            .start_update(
505                update,
506                input,
507                WorkflowStartUpdateOptions::builder()
508                    .maybe_update_id(options.update_id)
509                    .maybe_header(options.header)
510                    .wait_for_stage(WorkflowUpdateWaitStage::Completed)
511                    .build(),
512            )
513            .await?;
514        handle.get_result().await
515    }
516
517    /// Start an update and return a handle without waiting for completion.
518    /// Use `execute_update()` if you want to wait for the result immediately.
519    pub async fn start_update<U>(
520        &self,
521        update: U,
522        input: U::Input,
523        options: WorkflowStartUpdateOptions,
524    ) -> Result<WorkflowUpdateHandle<CT, U::Output>, WorkflowUpdateError>
525    where
526        CT: WorkflowService + NamespacedClient + Clone,
527        U: UpdateDefinition<Workflow = W>,
528        U::Input: Send,
529    {
530        let dc = self.client.data_converter();
531        let payloads = dc
532            .to_payloads(&SerializationContextData::Workflow, &input)
533            .await?;
534
535        let lifecycle_stage = match options.wait_for_stage {
536            WorkflowUpdateWaitStage::Admitted => UpdateWorkflowExecutionLifecycleStage::Admitted,
537            WorkflowUpdateWaitStage::Accepted => UpdateWorkflowExecutionLifecycleStage::Accepted,
538            WorkflowUpdateWaitStage::Completed => UpdateWorkflowExecutionLifecycleStage::Completed,
539        };
540
541        let update_id = options
542            .update_id
543            .unwrap_or_else(|| Uuid::new_v4().to_string());
544
545        let response = WorkflowService::update_workflow_execution(
546            &mut self.client.clone(),
547            UpdateWorkflowExecutionRequest {
548                namespace: self.client.namespace(),
549                workflow_execution: Some(ProtoWorkflowExecution {
550                    workflow_id: self.info().workflow_id.clone(),
551                    run_id: self.info().run_id.clone().unwrap_or_default(),
552                }),
553                wait_policy: Some(WaitPolicy {
554                    lifecycle_stage: lifecycle_stage.into(),
555                }),
556                request: Some(update::v1::Request {
557                    meta: Some(update::v1::Meta {
558                        update_id: update_id.clone(),
559                        identity: self.client.identity(),
560                    }),
561                    input: Some(update::v1::Input {
562                        header: options.header,
563                        name: update.name().to_string(),
564                        args: Some(Payloads { payloads }),
565                    }),
566                }),
567                ..Default::default()
568            }
569            .into_request(),
570        )
571        .await
572        .map_err(WorkflowUpdateError::from_status)?
573        .into_inner();
574
575        // Extract run_id from response if available
576        let run_id = response
577            .update_ref
578            .as_ref()
579            .and_then(|r| r.workflow_execution.as_ref())
580            .map(|e| e.run_id.clone())
581            .filter(|s| !s.is_empty())
582            .or_else(|| self.info().run_id.clone());
583
584        Ok(WorkflowUpdateHandle {
585            client: self.client.clone(),
586            update_id,
587            workflow_id: self.info().workflow_id.clone(),
588            run_id,
589            known_outcome: response.outcome,
590            _output: PhantomData,
591        })
592    }
593
594    /// Request cancellation of this workflow.
595    pub async fn cancel(&self, opts: WorkflowCancelOptions) -> Result<(), WorkflowInteractionError>
596    where
597        CT: NamespacedClient,
598    {
599        WorkflowService::request_cancel_workflow_execution(
600            &mut self.client.clone(),
601            RequestCancelWorkflowExecutionRequest {
602                namespace: self.client.namespace(),
603                workflow_execution: Some(ProtoWorkflowExecution {
604                    workflow_id: self.info.workflow_id.clone(),
605                    run_id: self.info.run_id.clone().unwrap_or_default(),
606                }),
607                identity: self.client.identity(),
608                request_id: opts
609                    .request_id
610                    .unwrap_or_else(|| Uuid::new_v4().to_string()),
611                first_execution_run_id: self
612                    .info
613                    .first_execution_run_id
614                    .clone()
615                    .unwrap_or_default(),
616                reason: opts.reason,
617                links: vec![],
618            }
619            .into_request(),
620        )
621        .await
622        .map_err(WorkflowInteractionError::from_status)?;
623        Ok(())
624    }
625
626    /// Terminate this workflow.
627    pub async fn terminate(
628        &self,
629        opts: WorkflowTerminateOptions,
630    ) -> Result<(), WorkflowInteractionError>
631    where
632        CT: NamespacedClient,
633    {
634        WorkflowService::terminate_workflow_execution(
635            &mut self.client.clone(),
636            TerminateWorkflowExecutionRequest {
637                namespace: self.client.namespace(),
638                workflow_execution: Some(ProtoWorkflowExecution {
639                    workflow_id: self.info.workflow_id.clone(),
640                    run_id: self.info.run_id.clone().unwrap_or_default(),
641                }),
642                reason: opts.reason,
643                details: opts.details,
644                identity: self.client.identity(),
645                first_execution_run_id: self
646                    .info
647                    .first_execution_run_id
648                    .clone()
649                    .unwrap_or_default(),
650                links: vec![],
651            }
652            .into_request(),
653        )
654        .await
655        .map_err(WorkflowInteractionError::from_status)?;
656        Ok(())
657    }
658
659    /// Get workflow execution description/metadata.
660    pub async fn describe(
661        &self,
662        _opts: WorkflowDescribeOptions,
663    ) -> Result<WorkflowExecutionDescription, WorkflowInteractionError>
664    where
665        CT: NamespacedClient,
666    {
667        let response = WorkflowService::describe_workflow_execution(
668            &mut self.client.clone(),
669            DescribeWorkflowExecutionRequest {
670                namespace: self.client.namespace(),
671                execution: Some(ProtoWorkflowExecution {
672                    workflow_id: self.info.workflow_id.clone(),
673                    run_id: self.info.run_id.clone().unwrap_or_default(),
674                }),
675            }
676            .into_request(),
677        )
678        .await
679        .map_err(WorkflowInteractionError::from_status)?
680        .into_inner();
681        Ok(WorkflowExecutionDescription::new(response))
682    }
683    /// Fetch workflow execution history.
684    pub async fn fetch_history(
685        &self,
686        opts: WorkflowFetchHistoryOptions,
687    ) -> Result<WorkflowHistory, WorkflowInteractionError>
688    where
689        CT: NamespacedClient,
690    {
691        let run_id = self.info.run_id.clone().unwrap_or_default();
692        self.fetch_history_for_run(&run_id, &opts).await
693    }
694
695    /// Fetch history for a specific run_id, handling pagination.
696    async fn fetch_history_for_run(
697        &self,
698        run_id: &str,
699        opts: &WorkflowFetchHistoryOptions,
700    ) -> Result<WorkflowHistory, WorkflowInteractionError>
701    where
702        CT: NamespacedClient,
703    {
704        let mut all_events = Vec::new();
705        let mut next_page_token = vec![];
706
707        loop {
708            let response = WorkflowService::get_workflow_execution_history(
709                &mut self.client.clone(),
710                GetWorkflowExecutionHistoryRequest {
711                    namespace: self.client.namespace(),
712                    execution: Some(ProtoWorkflowExecution {
713                        workflow_id: self.info.workflow_id.clone(),
714                        run_id: run_id.to_string(),
715                    }),
716                    next_page_token: next_page_token.clone(),
717                    skip_archival: opts.skip_archival,
718                    wait_new_event: opts.wait_new_event,
719                    history_event_filter_type: opts.event_filter_type as i32,
720                    ..Default::default()
721                }
722                .into_request(),
723            )
724            .await
725            .map_err(WorkflowInteractionError::from_status)?
726            .into_inner();
727
728            if let Some(history) = response.history {
729                all_events.extend(history.events);
730            }
731
732            if response.next_page_token.is_empty() {
733                break;
734            }
735            next_page_token = response.next_page_token;
736        }
737
738        Ok(WorkflowHistory::new(all_events))
739    }
740}
741
742/// Handle to a workflow update that has been started but may not be complete.
743///
744/// Use `get_result()` to wait for the update to complete and retrieve its result.
745pub struct WorkflowUpdateHandle<CT, T> {
746    client: CT,
747    update_id: String,
748    workflow_id: String,
749    run_id: Option<String>,
750    /// If the update was started with `Completed` wait stage, the outcome is already available.
751    known_outcome: Option<update::v1::Outcome>,
752    _output: PhantomData<T>,
753}
754
755impl<CT, T> WorkflowUpdateHandle<CT, T> {
756    /// Get the update ID.
757    pub fn id(&self) -> &str {
758        &self.update_id
759    }
760
761    /// Get the workflow ID.
762    pub fn workflow_id(&self) -> &str {
763        &self.workflow_id
764    }
765
766    /// Get the workflow run ID, if available.
767    pub fn workflow_run_id(&self) -> Option<&str> {
768        self.run_id.as_deref()
769    }
770}
771
772impl<CT, T: 'static> WorkflowUpdateHandle<CT, T>
773where
774    CT: WorkflowService + NamespacedClient + Clone,
775{
776    /// Wait for the update to complete and return the result.
777    pub async fn get_result(&self) -> Result<T, WorkflowUpdateError>
778    where
779        T: temporalio_common::data_converters::TemporalDeserializable,
780    {
781        let outcome = if let Some(known) = &self.known_outcome {
782            known.clone()
783        } else {
784            let response = WorkflowService::poll_workflow_execution_update(
785                &mut self.client.clone(),
786                PollWorkflowExecutionUpdateRequest {
787                    namespace: self.client.namespace(),
788                    update_ref: Some(update::v1::UpdateRef {
789                        workflow_execution: Some(ProtoWorkflowExecution {
790                            workflow_id: self.workflow_id.clone(),
791                            run_id: self.run_id.clone().unwrap_or_default(),
792                        }),
793                        update_id: self.update_id.clone(),
794                    }),
795                    identity: self.client.identity(),
796                    wait_policy: Some(WaitPolicy {
797                        lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed.into(),
798                    }),
799                }
800                .into_request(),
801            )
802            .await
803            .map_err(WorkflowUpdateError::from_status)?
804            .into_inner();
805
806            response.outcome.ok_or_else(|| {
807                WorkflowUpdateError::Other("Update poll returned no outcome".into())
808            })?
809        };
810
811        match outcome.value {
812            Some(update::v1::outcome::Value::Success(success)) => self
813                .client
814                .data_converter()
815                .from_payloads(&SerializationContextData::Workflow, success.payloads)
816                .await
817                .map_err(WorkflowUpdateError::from),
818            Some(update::v1::outcome::Value::Failure(failure)) => {
819                Err(WorkflowUpdateError::Failed(Box::new(failure)))
820            }
821            None => Err(WorkflowUpdateError::Other(
822                "Update returned no outcome value".into(),
823            )),
824        }
825    }
826}