squads_temporal_client/workflow_handle/
mod.rs

1use crate::{InterceptedMetricsSvc, RawClientLike, WorkflowService};
2use anyhow::{anyhow, bail};
3use std::{fmt::Debug, marker::PhantomData};
4use temporal_sdk_core_protos::{
5    coresdk::FromPayloadsExt,
6    temporal::api::{
7        common::v1::{Payload, WorkflowExecution},
8        enums::v1::HistoryEventFilterType,
9        failure::v1::Failure,
10        history::v1::history_event::Attributes,
11        workflowservice::v1::GetWorkflowExecutionHistoryRequest,
12    },
13};
14
15/// Enumerates terminal states for a particular workflow execution
16// TODO: Add non-proto failure types, flesh out details, etc.
17#[derive(Debug)]
18#[allow(clippy::large_enum_variant)]
19pub enum WorkflowExecutionResult<T> {
20    /// The workflow finished successfully
21    Succeeded(T),
22    /// The workflow finished in failure
23    Failed(Failure),
24    /// The workflow was cancelled
25    Cancelled(Vec<Payload>),
26    /// The workflow was terminated
27    Terminated(Vec<Payload>),
28    /// The workflow timed out
29    TimedOut,
30    /// The workflow continued as new
31    ContinuedAsNew,
32}
33
34impl<T> WorkflowExecutionResult<T>
35where
36    T: Debug,
37{
38    /// Unwrap the result, panicking if it was not a success
39    pub fn unwrap_success(self) -> T {
40        match self {
41            Self::Succeeded(t) => t,
42            o => panic!("Expected success, got {o:?}"),
43        }
44    }
45}
46
47/// Options for fetching workflow results
48#[derive(Debug, Clone, Copy)]
49pub struct GetWorkflowResultOpts {
50    /// If true (the default), follows to the next workflow run in the execution chain while
51    /// retrieving results.
52    pub follow_runs: bool,
53}
54impl Default for GetWorkflowResultOpts {
55    fn default() -> Self {
56        Self { follow_runs: true }
57    }
58}
59
60/// A workflow handle which can refer to a specific workflow run, or a chain of workflow runs with
61/// the same workflow id.
62pub struct WorkflowHandle<ClientT, ResultT> {
63    client: ClientT,
64    info: WorkflowExecutionInfo,
65
66    _res_type: PhantomData<ResultT>,
67}
68
69/// Holds needed information to refer to a specific workflow run, or workflow execution chain
70#[derive(Debug)]
71pub struct WorkflowExecutionInfo {
72    /// Namespace the workflow lives in
73    pub namespace: String,
74    /// The workflow's id
75    pub workflow_id: String,
76    /// If set, target this specific run of the workflow
77    pub run_id: Option<String>,
78}
79
80impl WorkflowExecutionInfo {
81    /// Bind the workflow info to a specific client, turning it into a workflow handle
82    pub fn bind_untyped<CT>(self, client: CT) -> UntypedWorkflowHandle<CT>
83    where
84        CT: RawClientLike<SvcType = InterceptedMetricsSvc> + Clone,
85    {
86        UntypedWorkflowHandle::new(client, self)
87    }
88}
89
90/// A workflow handle to a workflow with unknown types. Uses raw payloads.
91pub(crate) type UntypedWorkflowHandle<CT> = WorkflowHandle<CT, Vec<Payload>>;
92
93impl<CT, RT> WorkflowHandle<CT, RT>
94where
95    CT: RawClientLike<SvcType = InterceptedMetricsSvc> + Clone,
96    // TODO: Make more generic, capable of (de)serialization w/ serde
97    RT: FromPayloadsExt,
98{
99    pub(crate) fn new(client: CT, info: WorkflowExecutionInfo) -> Self {
100        Self {
101            client,
102            info,
103            _res_type: PhantomData::<RT>,
104        }
105    }
106
107    /// Get the workflow execution info
108    pub fn info(&self) -> &WorkflowExecutionInfo {
109        &self.info
110    }
111
112    /// Get the client attached to this handle
113    pub fn client(&self) -> &CT {
114        &self.client
115    }
116
117    /// Await the result of the workflow execution
118    pub async fn get_workflow_result(
119        &self,
120        opts: GetWorkflowResultOpts,
121    ) -> Result<WorkflowExecutionResult<RT>, anyhow::Error> {
122        let mut next_page_tok = vec![];
123        let mut run_id = self.info.run_id.clone().unwrap_or_default();
124        loop {
125            let server_res = self
126                .client
127                .clone()
128                .get_workflow_execution_history(GetWorkflowExecutionHistoryRequest {
129                    namespace: self.info.namespace.to_string(),
130                    execution: Some(WorkflowExecution {
131                        workflow_id: self.info.workflow_id.clone(),
132                        run_id: run_id.clone(),
133                    }),
134                    skip_archival: true,
135                    wait_new_event: true,
136                    history_event_filter_type: HistoryEventFilterType::CloseEvent as i32,
137                    next_page_token: next_page_tok.clone(),
138                    ..Default::default()
139                })
140                .await?
141                .into_inner();
142
143            let mut history = server_res
144                .history
145                .ok_or_else(|| anyhow!("Server returned an empty history!"))?;
146
147            if history.events.is_empty() {
148                next_page_tok = server_res.next_page_token;
149                continue;
150            }
151            // If page token was previously set, clear it.
152            next_page_tok = vec![];
153
154            let event_attrs = history.events.pop().and_then(|ev| ev.attributes);
155
156            macro_rules! follow {
157                ($attrs:ident) => {
158                    if opts.follow_runs && $attrs.new_execution_run_id != "" {
159                        run_id = $attrs.new_execution_run_id;
160                        continue;
161                    }
162                };
163            }
164
165            break match event_attrs {
166                Some(Attributes::WorkflowExecutionCompletedEventAttributes(attrs)) => {
167                    follow!(attrs);
168                    Ok(WorkflowExecutionResult::Succeeded(RT::from_payloads(
169                        attrs.result,
170                    )))
171                }
172                Some(Attributes::WorkflowExecutionFailedEventAttributes(attrs)) => {
173                    follow!(attrs);
174                    Ok(WorkflowExecutionResult::Failed(
175                        attrs.failure.unwrap_or_default(),
176                    ))
177                }
178                Some(Attributes::WorkflowExecutionCanceledEventAttributes(attrs)) => Ok(
179                    WorkflowExecutionResult::Cancelled(Vec::from_payloads(attrs.details)),
180                ),
181                Some(Attributes::WorkflowExecutionTimedOutEventAttributes(attrs)) => {
182                    follow!(attrs);
183                    Ok(WorkflowExecutionResult::TimedOut)
184                }
185                Some(Attributes::WorkflowExecutionTerminatedEventAttributes(attrs)) => Ok(
186                    WorkflowExecutionResult::Terminated(Vec::from_payloads(attrs.details)),
187                ),
188                Some(Attributes::WorkflowExecutionContinuedAsNewEventAttributes(attrs)) => {
189                    if opts.follow_runs {
190                        if !attrs.new_execution_run_id.is_empty() {
191                            run_id = attrs.new_execution_run_id;
192                            continue;
193                        } else {
194                            bail!("New execution run id was empty in continue as new event!");
195                        }
196                    } else {
197                        Ok(WorkflowExecutionResult::ContinuedAsNew)
198                    }
199                }
200                o => Err(anyhow!(
201                    "Server returned an event that didn't match the CloseEvent filter. \
202                     This is either a server bug or a new event the SDK does not understand. \
203                     Event details: {o:?}"
204                )),
205            };
206        }
207    }
208}