squads_temporal_client/workflow_handle/
mod.rs1use crate::{InterceptedMetricsSvc, RawClientLike, WorkflowService};
2use anyhow::{anyhow, bail};
3use std::{fmt::Debug, marker::PhantomData};
4use temporal_sdk_core_protos::{
5 coresdk::FromPayloadsExt,
6 temporal::api::{
7 common::v1::{Payload, WorkflowExecution},
8 enums::v1::HistoryEventFilterType,
9 failure::v1::Failure,
10 history::v1::history_event::Attributes,
11 workflowservice::v1::GetWorkflowExecutionHistoryRequest,
12 },
13};
14
15#[derive(Debug)]
18#[allow(clippy::large_enum_variant)]
19pub enum WorkflowExecutionResult<T> {
20 Succeeded(T),
22 Failed(Failure),
24 Cancelled(Vec<Payload>),
26 Terminated(Vec<Payload>),
28 TimedOut,
30 ContinuedAsNew,
32}
33
34impl<T> WorkflowExecutionResult<T>
35where
36 T: Debug,
37{
38 pub fn unwrap_success(self) -> T {
40 match self {
41 Self::Succeeded(t) => t,
42 o => panic!("Expected success, got {o:?}"),
43 }
44 }
45}
46
47#[derive(Debug, Clone, Copy)]
49pub struct GetWorkflowResultOpts {
50 pub follow_runs: bool,
53}
54impl Default for GetWorkflowResultOpts {
55 fn default() -> Self {
56 Self { follow_runs: true }
57 }
58}
59
60pub struct WorkflowHandle<ClientT, ResultT> {
63 client: ClientT,
64 info: WorkflowExecutionInfo,
65
66 _res_type: PhantomData<ResultT>,
67}
68
69#[derive(Debug)]
71pub struct WorkflowExecutionInfo {
72 pub namespace: String,
74 pub workflow_id: String,
76 pub run_id: Option<String>,
78}
79
80impl WorkflowExecutionInfo {
81 pub fn bind_untyped<CT>(self, client: CT) -> UntypedWorkflowHandle<CT>
83 where
84 CT: RawClientLike<SvcType = InterceptedMetricsSvc> + Clone,
85 {
86 UntypedWorkflowHandle::new(client, self)
87 }
88}
89
90pub(crate) type UntypedWorkflowHandle<CT> = WorkflowHandle<CT, Vec<Payload>>;
92
93impl<CT, RT> WorkflowHandle<CT, RT>
94where
95 CT: RawClientLike<SvcType = InterceptedMetricsSvc> + Clone,
96 RT: FromPayloadsExt,
98{
99 pub(crate) fn new(client: CT, info: WorkflowExecutionInfo) -> Self {
100 Self {
101 client,
102 info,
103 _res_type: PhantomData::<RT>,
104 }
105 }
106
107 pub fn info(&self) -> &WorkflowExecutionInfo {
109 &self.info
110 }
111
112 pub fn client(&self) -> &CT {
114 &self.client
115 }
116
117 pub async fn get_workflow_result(
119 &self,
120 opts: GetWorkflowResultOpts,
121 ) -> Result<WorkflowExecutionResult<RT>, anyhow::Error> {
122 let mut next_page_tok = vec![];
123 let mut run_id = self.info.run_id.clone().unwrap_or_default();
124 loop {
125 let server_res = self
126 .client
127 .clone()
128 .get_workflow_execution_history(GetWorkflowExecutionHistoryRequest {
129 namespace: self.info.namespace.to_string(),
130 execution: Some(WorkflowExecution {
131 workflow_id: self.info.workflow_id.clone(),
132 run_id: run_id.clone(),
133 }),
134 skip_archival: true,
135 wait_new_event: true,
136 history_event_filter_type: HistoryEventFilterType::CloseEvent as i32,
137 next_page_token: next_page_tok.clone(),
138 ..Default::default()
139 })
140 .await?
141 .into_inner();
142
143 let mut history = server_res
144 .history
145 .ok_or_else(|| anyhow!("Server returned an empty history!"))?;
146
147 if history.events.is_empty() {
148 next_page_tok = server_res.next_page_token;
149 continue;
150 }
151 next_page_tok = vec![];
153
154 let event_attrs = history.events.pop().and_then(|ev| ev.attributes);
155
156 macro_rules! follow {
157 ($attrs:ident) => {
158 if opts.follow_runs && $attrs.new_execution_run_id != "" {
159 run_id = $attrs.new_execution_run_id;
160 continue;
161 }
162 };
163 }
164
165 break match event_attrs {
166 Some(Attributes::WorkflowExecutionCompletedEventAttributes(attrs)) => {
167 follow!(attrs);
168 Ok(WorkflowExecutionResult::Succeeded(RT::from_payloads(
169 attrs.result,
170 )))
171 }
172 Some(Attributes::WorkflowExecutionFailedEventAttributes(attrs)) => {
173 follow!(attrs);
174 Ok(WorkflowExecutionResult::Failed(
175 attrs.failure.unwrap_or_default(),
176 ))
177 }
178 Some(Attributes::WorkflowExecutionCanceledEventAttributes(attrs)) => Ok(
179 WorkflowExecutionResult::Cancelled(Vec::from_payloads(attrs.details)),
180 ),
181 Some(Attributes::WorkflowExecutionTimedOutEventAttributes(attrs)) => {
182 follow!(attrs);
183 Ok(WorkflowExecutionResult::TimedOut)
184 }
185 Some(Attributes::WorkflowExecutionTerminatedEventAttributes(attrs)) => Ok(
186 WorkflowExecutionResult::Terminated(Vec::from_payloads(attrs.details)),
187 ),
188 Some(Attributes::WorkflowExecutionContinuedAsNewEventAttributes(attrs)) => {
189 if opts.follow_runs {
190 if !attrs.new_execution_run_id.is_empty() {
191 run_id = attrs.new_execution_run_id;
192 continue;
193 } else {
194 bail!("New execution run id was empty in continue as new event!");
195 }
196 } else {
197 Ok(WorkflowExecutionResult::ContinuedAsNew)
198 }
199 }
200 o => Err(anyhow!(
201 "Server returned an event that didn't match the CloseEvent filter. \
202 This is either a server bug or a new event the SDK does not understand. \
203 Event details: {o:?}"
204 )),
205 };
206 }
207 }
208}