1use crate::{NamespacedClient, errors::AsyncActivityError, grpc::WorkflowService};
4use temporalio_common::protos::{
5 TaskToken,
6 temporal::api::{
7 common::v1::Payloads,
8 failure::v1::Failure,
9 workflowservice::v1::{
10 RecordActivityTaskHeartbeatByIdRequest, RecordActivityTaskHeartbeatByIdResponse,
11 RecordActivityTaskHeartbeatRequest, RecordActivityTaskHeartbeatResponse,
12 RespondActivityTaskCanceledByIdRequest, RespondActivityTaskCanceledRequest,
13 RespondActivityTaskCompletedByIdRequest, RespondActivityTaskCompletedRequest,
14 RespondActivityTaskFailedByIdRequest, RespondActivityTaskFailedRequest,
15 },
16 },
17};
18use tonic::IntoRequest;
19
20#[derive(Debug, Clone)]
22pub enum ActivityIdentifier {
23 TaskToken(TaskToken),
25 ById {
27 workflow_id: String,
29 run_id: String,
31 activity_id: String,
33 },
34}
35
36impl ActivityIdentifier {
37 pub fn from_task_token(token: TaskToken) -> Self {
39 Self::TaskToken(token)
40 }
41
42 pub fn by_id(
45 workflow_id: impl Into<String>,
46 run_id: impl Into<String>,
47 activity_id: impl Into<String>,
48 ) -> Self {
49 Self::ById {
50 workflow_id: workflow_id.into(),
51 run_id: run_id.into(),
52 activity_id: activity_id.into(),
53 }
54 }
55}
56
57pub struct AsyncActivityHandle<CT> {
59 client: CT,
60 identifier: ActivityIdentifier,
61}
62
63impl<CT> AsyncActivityHandle<CT> {
64 pub fn new(client: CT, identifier: ActivityIdentifier) -> Self {
66 Self { client, identifier }
67 }
68
69 pub fn identifier(&self) -> &ActivityIdentifier {
71 &self.identifier
72 }
73
74 pub fn client(&self) -> &CT {
76 &self.client
77 }
78}
79
80impl<CT: WorkflowService + NamespacedClient + Clone> AsyncActivityHandle<CT> {
81 pub async fn complete(&self, result: Option<Payloads>) -> Result<(), AsyncActivityError> {
83 match &self.identifier {
84 ActivityIdentifier::TaskToken(token) => {
85 WorkflowService::respond_activity_task_completed(
86 &mut self.client.clone(),
87 RespondActivityTaskCompletedRequest {
88 task_token: token.0.clone(),
89 result,
90 identity: self.client.identity(),
91 namespace: self.client.namespace(),
92 ..Default::default()
93 }
94 .into_request(),
95 )
96 .await
97 .map_err(AsyncActivityError::from_status)?;
98 }
99 ActivityIdentifier::ById {
100 workflow_id,
101 run_id,
102 activity_id,
103 } => {
104 WorkflowService::respond_activity_task_completed_by_id(
105 &mut self.client.clone(),
106 RespondActivityTaskCompletedByIdRequest {
107 namespace: self.client.namespace(),
108 workflow_id: workflow_id.clone(),
109 run_id: run_id.clone(),
110 activity_id: activity_id.clone(),
111 result,
112 identity: self.client.identity(),
113 resource_id: Default::default(),
114 }
115 .into_request(),
116 )
117 .await
118 .map_err(AsyncActivityError::from_status)?;
119 }
120 }
121 Ok(())
122 }
123
124 pub async fn fail(
126 &self,
127 failure: Failure,
128 last_heartbeat_details: Option<Payloads>,
129 ) -> Result<(), AsyncActivityError> {
130 match &self.identifier {
131 ActivityIdentifier::TaskToken(token) => {
132 WorkflowService::respond_activity_task_failed(
133 &mut self.client.clone(),
134 RespondActivityTaskFailedRequest {
135 task_token: token.0.clone(),
136 failure: Some(failure),
137 identity: self.client.identity(),
138 namespace: self.client.namespace(),
139 last_heartbeat_details,
140 ..Default::default()
141 }
142 .into_request(),
143 )
144 .await
145 .map_err(AsyncActivityError::from_status)?;
146 }
147 ActivityIdentifier::ById {
148 workflow_id,
149 run_id,
150 activity_id,
151 } => {
152 WorkflowService::respond_activity_task_failed_by_id(
153 &mut self.client.clone(),
154 RespondActivityTaskFailedByIdRequest {
155 namespace: self.client.namespace(),
156 workflow_id: workflow_id.clone(),
157 run_id: run_id.clone(),
158 activity_id: activity_id.clone(),
159 failure: Some(failure),
160 identity: self.client.identity(),
161 last_heartbeat_details,
162 resource_id: Default::default(),
163 }
164 .into_request(),
165 )
166 .await
167 .map_err(AsyncActivityError::from_status)?;
168 }
169 }
170 Ok(())
171 }
172
173 pub async fn report_cancelation(
175 &self,
176 details: Option<Payloads>,
177 ) -> Result<(), AsyncActivityError> {
178 match &self.identifier {
179 ActivityIdentifier::TaskToken(token) => {
180 WorkflowService::respond_activity_task_canceled(
181 &mut self.client.clone(),
182 RespondActivityTaskCanceledRequest {
183 task_token: token.0.clone(),
184 details,
185 identity: self.client.identity(),
186 namespace: self.client.namespace(),
187 ..Default::default()
188 }
189 .into_request(),
190 )
191 .await
192 .map_err(AsyncActivityError::from_status)?;
193 }
194 ActivityIdentifier::ById {
195 workflow_id,
196 run_id,
197 activity_id,
198 } => {
199 WorkflowService::respond_activity_task_canceled_by_id(
200 &mut self.client.clone(),
201 RespondActivityTaskCanceledByIdRequest {
202 namespace: self.client.namespace(),
203 workflow_id: workflow_id.clone(),
204 run_id: run_id.clone(),
205 activity_id: activity_id.clone(),
206 details,
207 identity: self.client.identity(),
208 ..Default::default()
209 }
210 .into_request(),
211 )
212 .await
213 .map_err(AsyncActivityError::from_status)?;
214 }
215 }
216 Ok(())
217 }
218
219 pub async fn heartbeat(
224 &self,
225 details: Option<Payloads>,
226 ) -> Result<ActivityHeartbeatResponse, AsyncActivityError> {
227 match &self.identifier {
228 ActivityIdentifier::TaskToken(token) => {
229 let resp = WorkflowService::record_activity_task_heartbeat(
230 &mut self.client.clone(),
231 RecordActivityTaskHeartbeatRequest {
232 task_token: token.0.clone(),
233 details,
234 identity: self.client.identity(),
235 namespace: self.client.namespace(),
236 resource_id: Default::default(),
237 }
238 .into_request(),
239 )
240 .await
241 .map_err(AsyncActivityError::from_status)?
242 .into_inner();
243 Ok(ActivityHeartbeatResponse::from(resp))
244 }
245 ActivityIdentifier::ById {
246 workflow_id,
247 run_id,
248 activity_id,
249 } => {
250 let resp = WorkflowService::record_activity_task_heartbeat_by_id(
251 &mut self.client.clone(),
252 RecordActivityTaskHeartbeatByIdRequest {
253 namespace: self.client.namespace(),
254 workflow_id: workflow_id.clone(),
255 run_id: run_id.clone(),
256 activity_id: activity_id.clone(),
257 details,
258 identity: self.client.identity(),
259 resource_id: Default::default(),
260 }
261 .into_request(),
262 )
263 .await
264 .map_err(AsyncActivityError::from_status)?
265 .into_inner();
266 Ok(ActivityHeartbeatResponse::from(resp))
267 }
268 }
269 }
270}
271
272#[derive(Debug, Clone)]
274pub struct ActivityHeartbeatResponse {
275 pub cancel_requested: bool,
277 pub activity_paused: bool,
279 pub activity_reset: bool,
281}
282
283impl From<RecordActivityTaskHeartbeatResponse> for ActivityHeartbeatResponse {
284 fn from(resp: RecordActivityTaskHeartbeatResponse) -> Self {
285 Self {
286 cancel_requested: resp.cancel_requested,
287 activity_paused: resp.activity_paused,
288 activity_reset: resp.activity_reset,
289 }
290 }
291}
292
293impl From<RecordActivityTaskHeartbeatByIdResponse> for ActivityHeartbeatResponse {
294 fn from(resp: RecordActivityTaskHeartbeatByIdResponse) -> Self {
295 Self {
296 cancel_requested: resp.cancel_requested,
297 activity_paused: resp.activity_paused,
298 activity_reset: resp.activity_reset,
299 }
300 }
301}