1use crate::{NamespacedClient, errors::AsyncActivityError, grpc::WorkflowService};
4use temporalio_common::protos::{
5 TaskToken,
6 temporal::api::{
7 common::v1::Payloads,
8 failure::v1::Failure,
9 workflowservice::v1::{
10 RecordActivityTaskHeartbeatByIdRequest, RecordActivityTaskHeartbeatByIdResponse,
11 RecordActivityTaskHeartbeatRequest, RecordActivityTaskHeartbeatResponse,
12 RespondActivityTaskCanceledByIdRequest, RespondActivityTaskCanceledRequest,
13 RespondActivityTaskCompletedByIdRequest, RespondActivityTaskCompletedRequest,
14 RespondActivityTaskFailedByIdRequest, RespondActivityTaskFailedRequest,
15 },
16 },
17};
18use tonic::IntoRequest;
19
20#[derive(Debug, Clone)]
22pub enum ActivityIdentifier {
23 TaskToken(TaskToken),
25 ById {
27 workflow_id: String,
29 run_id: String,
31 activity_id: String,
33 },
34}
35
36impl ActivityIdentifier {
37 pub fn from_task_token(token: TaskToken) -> Self {
39 Self::TaskToken(token)
40 }
41
42 pub fn by_id(
45 workflow_id: impl Into<String>,
46 run_id: impl Into<String>,
47 activity_id: impl Into<String>,
48 ) -> Self {
49 Self::ById {
50 workflow_id: workflow_id.into(),
51 run_id: run_id.into(),
52 activity_id: activity_id.into(),
53 }
54 }
55}
56
57pub struct AsyncActivityHandle<CT> {
59 client: CT,
60 identifier: ActivityIdentifier,
61}
62
63impl<CT> AsyncActivityHandle<CT> {
64 pub fn new(client: CT, identifier: ActivityIdentifier) -> Self {
66 Self { client, identifier }
67 }
68
69 pub fn identifier(&self) -> &ActivityIdentifier {
71 &self.identifier
72 }
73
74 pub fn client(&self) -> &CT {
76 &self.client
77 }
78}
79
80impl<CT: WorkflowService + NamespacedClient + Clone> AsyncActivityHandle<CT> {
81 pub async fn complete(&self, result: Option<Payloads>) -> Result<(), AsyncActivityError> {
83 match &self.identifier {
84 ActivityIdentifier::TaskToken(token) => {
85 WorkflowService::respond_activity_task_completed(
86 &mut self.client.clone(),
87 RespondActivityTaskCompletedRequest {
88 task_token: token.0.clone(),
89 result,
90 identity: self.client.identity(),
91 namespace: self.client.namespace(),
92 ..Default::default()
93 }
94 .into_request(),
95 )
96 .await
97 .map_err(AsyncActivityError::from_status)?;
98 }
99 ActivityIdentifier::ById {
100 workflow_id,
101 run_id,
102 activity_id,
103 } => {
104 WorkflowService::respond_activity_task_completed_by_id(
105 &mut self.client.clone(),
106 RespondActivityTaskCompletedByIdRequest {
107 namespace: self.client.namespace(),
108 workflow_id: workflow_id.clone(),
109 run_id: run_id.clone(),
110 activity_id: activity_id.clone(),
111 result,
112 identity: self.client.identity(),
113 }
114 .into_request(),
115 )
116 .await
117 .map_err(AsyncActivityError::from_status)?;
118 }
119 }
120 Ok(())
121 }
122
123 pub async fn fail(
125 &self,
126 failure: Failure,
127 last_heartbeat_details: Option<Payloads>,
128 ) -> Result<(), AsyncActivityError> {
129 match &self.identifier {
130 ActivityIdentifier::TaskToken(token) => {
131 WorkflowService::respond_activity_task_failed(
132 &mut self.client.clone(),
133 RespondActivityTaskFailedRequest {
134 task_token: token.0.clone(),
135 failure: Some(failure),
136 identity: self.client.identity(),
137 namespace: self.client.namespace(),
138 last_heartbeat_details,
139 ..Default::default()
140 }
141 .into_request(),
142 )
143 .await
144 .map_err(AsyncActivityError::from_status)?;
145 }
146 ActivityIdentifier::ById {
147 workflow_id,
148 run_id,
149 activity_id,
150 } => {
151 WorkflowService::respond_activity_task_failed_by_id(
152 &mut self.client.clone(),
153 RespondActivityTaskFailedByIdRequest {
154 namespace: self.client.namespace(),
155 workflow_id: workflow_id.clone(),
156 run_id: run_id.clone(),
157 activity_id: activity_id.clone(),
158 failure: Some(failure),
159 identity: self.client.identity(),
160 last_heartbeat_details,
161 }
162 .into_request(),
163 )
164 .await
165 .map_err(AsyncActivityError::from_status)?;
166 }
167 }
168 Ok(())
169 }
170
171 pub async fn report_cancelation(
173 &self,
174 details: Option<Payloads>,
175 ) -> Result<(), AsyncActivityError> {
176 match &self.identifier {
177 ActivityIdentifier::TaskToken(token) => {
178 WorkflowService::respond_activity_task_canceled(
179 &mut self.client.clone(),
180 RespondActivityTaskCanceledRequest {
181 task_token: token.0.clone(),
182 details,
183 identity: self.client.identity(),
184 namespace: self.client.namespace(),
185 ..Default::default()
186 }
187 .into_request(),
188 )
189 .await
190 .map_err(AsyncActivityError::from_status)?;
191 }
192 ActivityIdentifier::ById {
193 workflow_id,
194 run_id,
195 activity_id,
196 } => {
197 WorkflowService::respond_activity_task_canceled_by_id(
198 &mut self.client.clone(),
199 RespondActivityTaskCanceledByIdRequest {
200 namespace: self.client.namespace(),
201 workflow_id: workflow_id.clone(),
202 run_id: run_id.clone(),
203 activity_id: activity_id.clone(),
204 details,
205 identity: self.client.identity(),
206 ..Default::default()
207 }
208 .into_request(),
209 )
210 .await
211 .map_err(AsyncActivityError::from_status)?;
212 }
213 }
214 Ok(())
215 }
216
217 pub async fn heartbeat(
222 &self,
223 details: Option<Payloads>,
224 ) -> Result<ActivityHeartbeatResponse, AsyncActivityError> {
225 match &self.identifier {
226 ActivityIdentifier::TaskToken(token) => {
227 let resp = WorkflowService::record_activity_task_heartbeat(
228 &mut self.client.clone(),
229 RecordActivityTaskHeartbeatRequest {
230 task_token: token.0.clone(),
231 details,
232 identity: self.client.identity(),
233 namespace: self.client.namespace(),
234 }
235 .into_request(),
236 )
237 .await
238 .map_err(AsyncActivityError::from_status)?
239 .into_inner();
240 Ok(ActivityHeartbeatResponse::from(resp))
241 }
242 ActivityIdentifier::ById {
243 workflow_id,
244 run_id,
245 activity_id,
246 } => {
247 let resp = WorkflowService::record_activity_task_heartbeat_by_id(
248 &mut self.client.clone(),
249 RecordActivityTaskHeartbeatByIdRequest {
250 namespace: self.client.namespace(),
251 workflow_id: workflow_id.clone(),
252 run_id: run_id.clone(),
253 activity_id: activity_id.clone(),
254 details,
255 identity: self.client.identity(),
256 }
257 .into_request(),
258 )
259 .await
260 .map_err(AsyncActivityError::from_status)?
261 .into_inner();
262 Ok(ActivityHeartbeatResponse::from(resp))
263 }
264 }
265 }
266}
267
268#[derive(Debug, Clone)]
270pub struct ActivityHeartbeatResponse {
271 pub cancel_requested: bool,
273 pub activity_paused: bool,
275 pub activity_reset: bool,
277}
278
279impl From<RecordActivityTaskHeartbeatResponse> for ActivityHeartbeatResponse {
280 fn from(resp: RecordActivityTaskHeartbeatResponse) -> Self {
281 Self {
282 cancel_requested: resp.cancel_requested,
283 activity_paused: resp.activity_paused,
284 activity_reset: resp.activity_reset,
285 }
286 }
287}
288
289impl From<RecordActivityTaskHeartbeatByIdResponse> for ActivityHeartbeatResponse {
290 fn from(resp: RecordActivityTaskHeartbeatByIdResponse) -> Self {
291 Self {
292 cancel_requested: resp.cancel_requested,
293 activity_paused: resp.activity_paused,
294 activity_reset: resp.activity_reset,
295 }
296 }
297}