Skip to main content

temporalio_client/
async_activity_handle.rs

1//! Handle for completing activities asynchronously via a client.
2
3use crate::{NamespacedClient, errors::AsyncActivityError, grpc::WorkflowService};
4use temporalio_common::protos::{
5    TaskToken,
6    temporal::api::{
7        common::v1::Payloads,
8        failure::v1::Failure,
9        workflowservice::v1::{
10            RecordActivityTaskHeartbeatByIdRequest, RecordActivityTaskHeartbeatByIdResponse,
11            RecordActivityTaskHeartbeatRequest, RecordActivityTaskHeartbeatResponse,
12            RespondActivityTaskCanceledByIdRequest, RespondActivityTaskCanceledRequest,
13            RespondActivityTaskCompletedByIdRequest, RespondActivityTaskCompletedRequest,
14            RespondActivityTaskFailedByIdRequest, RespondActivityTaskFailedRequest,
15        },
16    },
17};
18use tonic::IntoRequest;
19
20/// Identifies an async activity for completion outside a worker.
21#[derive(Debug, Clone)]
22pub enum ActivityIdentifier {
23    /// Identify activity by its task token
24    TaskToken(TaskToken),
25    /// Identify activity by workflow and activity IDs.
26    ById {
27        /// ID of the workflow that scheduled this activity.
28        workflow_id: String,
29        /// Run ID of the workflow (optional - if not provided, targets the latest run).
30        run_id: String,
31        /// ID of the activity to complete.
32        activity_id: String,
33    },
34}
35
36impl ActivityIdentifier {
37    /// Create an identifier from a task token.
38    pub fn from_task_token(token: TaskToken) -> Self {
39        Self::TaskToken(token)
40    }
41
42    /// Create an identifier from workflow and activity IDs. Use an empty run id to target the
43    /// latest workflow execution.
44    pub fn by_id(
45        workflow_id: impl Into<String>,
46        run_id: impl Into<String>,
47        activity_id: impl Into<String>,
48    ) -> Self {
49        Self::ById {
50            workflow_id: workflow_id.into(),
51            run_id: run_id.into(),
52            activity_id: activity_id.into(),
53        }
54    }
55}
56
57/// Handle for completing activities asynchronously (outside the worker).
58pub struct AsyncActivityHandle<CT> {
59    client: CT,
60    identifier: ActivityIdentifier,
61}
62
63impl<CT> AsyncActivityHandle<CT> {
64    /// Create a new async activity handle.
65    pub fn new(client: CT, identifier: ActivityIdentifier) -> Self {
66        Self { client, identifier }
67    }
68
69    /// Get the identifier for this activity.
70    pub fn identifier(&self) -> &ActivityIdentifier {
71        &self.identifier
72    }
73
74    /// Get a reference to the underlying client.
75    pub fn client(&self) -> &CT {
76        &self.client
77    }
78}
79
80impl<CT: WorkflowService + NamespacedClient + Clone> AsyncActivityHandle<CT> {
81    /// Complete the activity with a successful result.
82    pub async fn complete(&self, result: Option<Payloads>) -> Result<(), AsyncActivityError> {
83        match &self.identifier {
84            ActivityIdentifier::TaskToken(token) => {
85                WorkflowService::respond_activity_task_completed(
86                    &mut self.client.clone(),
87                    RespondActivityTaskCompletedRequest {
88                        task_token: token.0.clone(),
89                        result,
90                        identity: self.client.identity(),
91                        namespace: self.client.namespace(),
92                        ..Default::default()
93                    }
94                    .into_request(),
95                )
96                .await
97                .map_err(AsyncActivityError::from_status)?;
98            }
99            ActivityIdentifier::ById {
100                workflow_id,
101                run_id,
102                activity_id,
103            } => {
104                WorkflowService::respond_activity_task_completed_by_id(
105                    &mut self.client.clone(),
106                    RespondActivityTaskCompletedByIdRequest {
107                        namespace: self.client.namespace(),
108                        workflow_id: workflow_id.clone(),
109                        run_id: run_id.clone(),
110                        activity_id: activity_id.clone(),
111                        result,
112                        identity: self.client.identity(),
113                    }
114                    .into_request(),
115                )
116                .await
117                .map_err(AsyncActivityError::from_status)?;
118            }
119        }
120        Ok(())
121    }
122
123    /// Fail the activity with a failure.
124    pub async fn fail(
125        &self,
126        failure: Failure,
127        last_heartbeat_details: Option<Payloads>,
128    ) -> Result<(), AsyncActivityError> {
129        match &self.identifier {
130            ActivityIdentifier::TaskToken(token) => {
131                WorkflowService::respond_activity_task_failed(
132                    &mut self.client.clone(),
133                    RespondActivityTaskFailedRequest {
134                        task_token: token.0.clone(),
135                        failure: Some(failure),
136                        identity: self.client.identity(),
137                        namespace: self.client.namespace(),
138                        last_heartbeat_details,
139                        ..Default::default()
140                    }
141                    .into_request(),
142                )
143                .await
144                .map_err(AsyncActivityError::from_status)?;
145            }
146            ActivityIdentifier::ById {
147                workflow_id,
148                run_id,
149                activity_id,
150            } => {
151                WorkflowService::respond_activity_task_failed_by_id(
152                    &mut self.client.clone(),
153                    RespondActivityTaskFailedByIdRequest {
154                        namespace: self.client.namespace(),
155                        workflow_id: workflow_id.clone(),
156                        run_id: run_id.clone(),
157                        activity_id: activity_id.clone(),
158                        failure: Some(failure),
159                        identity: self.client.identity(),
160                        last_heartbeat_details,
161                    }
162                    .into_request(),
163                )
164                .await
165                .map_err(AsyncActivityError::from_status)?;
166            }
167        }
168        Ok(())
169    }
170
171    /// Reports the activity as canceled.
172    pub async fn report_cancelation(
173        &self,
174        details: Option<Payloads>,
175    ) -> Result<(), AsyncActivityError> {
176        match &self.identifier {
177            ActivityIdentifier::TaskToken(token) => {
178                WorkflowService::respond_activity_task_canceled(
179                    &mut self.client.clone(),
180                    RespondActivityTaskCanceledRequest {
181                        task_token: token.0.clone(),
182                        details,
183                        identity: self.client.identity(),
184                        namespace: self.client.namespace(),
185                        ..Default::default()
186                    }
187                    .into_request(),
188                )
189                .await
190                .map_err(AsyncActivityError::from_status)?;
191            }
192            ActivityIdentifier::ById {
193                workflow_id,
194                run_id,
195                activity_id,
196            } => {
197                WorkflowService::respond_activity_task_canceled_by_id(
198                    &mut self.client.clone(),
199                    RespondActivityTaskCanceledByIdRequest {
200                        namespace: self.client.namespace(),
201                        workflow_id: workflow_id.clone(),
202                        run_id: run_id.clone(),
203                        activity_id: activity_id.clone(),
204                        details,
205                        identity: self.client.identity(),
206                        ..Default::default()
207                    }
208                    .into_request(),
209                )
210                .await
211                .map_err(AsyncActivityError::from_status)?;
212            }
213        }
214        Ok(())
215    }
216
217    /// Record a heartbeat for the activity.
218    ///
219    /// Heartbeats let the server know the activity is still running and can carry
220    /// progress information. The response indicates if cancellation has been requested.
221    pub async fn heartbeat(
222        &self,
223        details: Option<Payloads>,
224    ) -> Result<ActivityHeartbeatResponse, AsyncActivityError> {
225        match &self.identifier {
226            ActivityIdentifier::TaskToken(token) => {
227                let resp = WorkflowService::record_activity_task_heartbeat(
228                    &mut self.client.clone(),
229                    RecordActivityTaskHeartbeatRequest {
230                        task_token: token.0.clone(),
231                        details,
232                        identity: self.client.identity(),
233                        namespace: self.client.namespace(),
234                    }
235                    .into_request(),
236                )
237                .await
238                .map_err(AsyncActivityError::from_status)?
239                .into_inner();
240                Ok(ActivityHeartbeatResponse::from(resp))
241            }
242            ActivityIdentifier::ById {
243                workflow_id,
244                run_id,
245                activity_id,
246            } => {
247                let resp = WorkflowService::record_activity_task_heartbeat_by_id(
248                    &mut self.client.clone(),
249                    RecordActivityTaskHeartbeatByIdRequest {
250                        namespace: self.client.namespace(),
251                        workflow_id: workflow_id.clone(),
252                        run_id: run_id.clone(),
253                        activity_id: activity_id.clone(),
254                        details,
255                        identity: self.client.identity(),
256                    }
257                    .into_request(),
258                )
259                .await
260                .map_err(AsyncActivityError::from_status)?
261                .into_inner();
262                Ok(ActivityHeartbeatResponse::from(resp))
263            }
264        }
265    }
266}
267
268/// Response from a heartbeat call.
269#[derive(Debug, Clone)]
270pub struct ActivityHeartbeatResponse {
271    /// True if the activity has been asked to cancel itself.
272    pub cancel_requested: bool,
273    /// True if the activity is paused.
274    pub activity_paused: bool,
275    /// True if the activity was reset.
276    pub activity_reset: bool,
277}
278
279impl From<RecordActivityTaskHeartbeatResponse> for ActivityHeartbeatResponse {
280    fn from(resp: RecordActivityTaskHeartbeatResponse) -> Self {
281        Self {
282            cancel_requested: resp.cancel_requested,
283            activity_paused: resp.activity_paused,
284            activity_reset: resp.activity_reset,
285        }
286    }
287}
288
289impl From<RecordActivityTaskHeartbeatByIdResponse> for ActivityHeartbeatResponse {
290    fn from(resp: RecordActivityTaskHeartbeatByIdResponse) -> Self {
291        Self {
292            cancel_requested: resp.cancel_requested,
293            activity_paused: resp.activity_paused,
294            activity_reset: resp.activity_reset,
295        }
296    }
297}