Skip to main content

temporalio_client/
async_activity_handle.rs

1//! Handle for completing activities asynchronously via a client.
2
3use crate::{NamespacedClient, errors::AsyncActivityError, grpc::WorkflowService};
4use temporalio_common::protos::{
5    TaskToken,
6    temporal::api::{
7        common::v1::Payloads,
8        failure::v1::Failure,
9        workflowservice::v1::{
10            RecordActivityTaskHeartbeatByIdRequest, RecordActivityTaskHeartbeatByIdResponse,
11            RecordActivityTaskHeartbeatRequest, RecordActivityTaskHeartbeatResponse,
12            RespondActivityTaskCanceledByIdRequest, RespondActivityTaskCanceledRequest,
13            RespondActivityTaskCompletedByIdRequest, RespondActivityTaskCompletedRequest,
14            RespondActivityTaskFailedByIdRequest, RespondActivityTaskFailedRequest,
15        },
16    },
17};
18use tonic::IntoRequest;
19
20/// Identifies an async activity for completion outside a worker.
21#[derive(Debug, Clone)]
22pub enum ActivityIdentifier {
23    /// Identify activity by its task token
24    TaskToken(TaskToken),
25    /// Identify activity by workflow and activity IDs.
26    ById {
27        /// ID of the workflow that scheduled this activity.
28        workflow_id: String,
29        /// Run ID of the workflow (optional - if not provided, targets the latest run).
30        run_id: String,
31        /// ID of the activity to complete.
32        activity_id: String,
33    },
34}
35
36impl ActivityIdentifier {
37    /// Create an identifier from a task token.
38    pub fn from_task_token(token: TaskToken) -> Self {
39        Self::TaskToken(token)
40    }
41
42    /// Create an identifier from workflow and activity IDs. Use an empty run id to target the
43    /// latest workflow execution.
44    pub fn by_id(
45        workflow_id: impl Into<String>,
46        run_id: impl Into<String>,
47        activity_id: impl Into<String>,
48    ) -> Self {
49        Self::ById {
50            workflow_id: workflow_id.into(),
51            run_id: run_id.into(),
52            activity_id: activity_id.into(),
53        }
54    }
55}
56
57/// Handle for completing activities asynchronously (outside the worker).
58pub struct AsyncActivityHandle<CT> {
59    client: CT,
60    identifier: ActivityIdentifier,
61}
62
63impl<CT> AsyncActivityHandle<CT> {
64    /// Create a new async activity handle.
65    pub fn new(client: CT, identifier: ActivityIdentifier) -> Self {
66        Self { client, identifier }
67    }
68
69    /// Get the identifier for this activity.
70    pub fn identifier(&self) -> &ActivityIdentifier {
71        &self.identifier
72    }
73
74    /// Get a reference to the underlying client.
75    pub fn client(&self) -> &CT {
76        &self.client
77    }
78}
79
80impl<CT: WorkflowService + NamespacedClient + Clone> AsyncActivityHandle<CT> {
81    /// Complete the activity with a successful result.
82    pub async fn complete(&self, result: Option<Payloads>) -> Result<(), AsyncActivityError> {
83        match &self.identifier {
84            ActivityIdentifier::TaskToken(token) => {
85                WorkflowService::respond_activity_task_completed(
86                    &mut self.client.clone(),
87                    RespondActivityTaskCompletedRequest {
88                        task_token: token.0.clone(),
89                        result,
90                        identity: self.client.identity(),
91                        namespace: self.client.namespace(),
92                        ..Default::default()
93                    }
94                    .into_request(),
95                )
96                .await
97                .map_err(AsyncActivityError::from_status)?;
98            }
99            ActivityIdentifier::ById {
100                workflow_id,
101                run_id,
102                activity_id,
103            } => {
104                WorkflowService::respond_activity_task_completed_by_id(
105                    &mut self.client.clone(),
106                    RespondActivityTaskCompletedByIdRequest {
107                        namespace: self.client.namespace(),
108                        workflow_id: workflow_id.clone(),
109                        run_id: run_id.clone(),
110                        activity_id: activity_id.clone(),
111                        result,
112                        identity: self.client.identity(),
113                        resource_id: Default::default(),
114                    }
115                    .into_request(),
116                )
117                .await
118                .map_err(AsyncActivityError::from_status)?;
119            }
120        }
121        Ok(())
122    }
123
124    /// Fail the activity with a failure.
125    pub async fn fail(
126        &self,
127        failure: Failure,
128        last_heartbeat_details: Option<Payloads>,
129    ) -> Result<(), AsyncActivityError> {
130        match &self.identifier {
131            ActivityIdentifier::TaskToken(token) => {
132                WorkflowService::respond_activity_task_failed(
133                    &mut self.client.clone(),
134                    RespondActivityTaskFailedRequest {
135                        task_token: token.0.clone(),
136                        failure: Some(failure),
137                        identity: self.client.identity(),
138                        namespace: self.client.namespace(),
139                        last_heartbeat_details,
140                        ..Default::default()
141                    }
142                    .into_request(),
143                )
144                .await
145                .map_err(AsyncActivityError::from_status)?;
146            }
147            ActivityIdentifier::ById {
148                workflow_id,
149                run_id,
150                activity_id,
151            } => {
152                WorkflowService::respond_activity_task_failed_by_id(
153                    &mut self.client.clone(),
154                    RespondActivityTaskFailedByIdRequest {
155                        namespace: self.client.namespace(),
156                        workflow_id: workflow_id.clone(),
157                        run_id: run_id.clone(),
158                        activity_id: activity_id.clone(),
159                        failure: Some(failure),
160                        identity: self.client.identity(),
161                        last_heartbeat_details,
162                        resource_id: Default::default(),
163                    }
164                    .into_request(),
165                )
166                .await
167                .map_err(AsyncActivityError::from_status)?;
168            }
169        }
170        Ok(())
171    }
172
173    /// Reports the activity as canceled.
174    pub async fn report_cancelation(
175        &self,
176        details: Option<Payloads>,
177    ) -> Result<(), AsyncActivityError> {
178        match &self.identifier {
179            ActivityIdentifier::TaskToken(token) => {
180                WorkflowService::respond_activity_task_canceled(
181                    &mut self.client.clone(),
182                    RespondActivityTaskCanceledRequest {
183                        task_token: token.0.clone(),
184                        details,
185                        identity: self.client.identity(),
186                        namespace: self.client.namespace(),
187                        ..Default::default()
188                    }
189                    .into_request(),
190                )
191                .await
192                .map_err(AsyncActivityError::from_status)?;
193            }
194            ActivityIdentifier::ById {
195                workflow_id,
196                run_id,
197                activity_id,
198            } => {
199                WorkflowService::respond_activity_task_canceled_by_id(
200                    &mut self.client.clone(),
201                    RespondActivityTaskCanceledByIdRequest {
202                        namespace: self.client.namespace(),
203                        workflow_id: workflow_id.clone(),
204                        run_id: run_id.clone(),
205                        activity_id: activity_id.clone(),
206                        details,
207                        identity: self.client.identity(),
208                        ..Default::default()
209                    }
210                    .into_request(),
211                )
212                .await
213                .map_err(AsyncActivityError::from_status)?;
214            }
215        }
216        Ok(())
217    }
218
219    /// Record a heartbeat for the activity.
220    ///
221    /// Heartbeats let the server know the activity is still running and can carry
222    /// progress information. The response indicates if cancellation has been requested.
223    pub async fn heartbeat(
224        &self,
225        details: Option<Payloads>,
226    ) -> Result<ActivityHeartbeatResponse, AsyncActivityError> {
227        match &self.identifier {
228            ActivityIdentifier::TaskToken(token) => {
229                let resp = WorkflowService::record_activity_task_heartbeat(
230                    &mut self.client.clone(),
231                    RecordActivityTaskHeartbeatRequest {
232                        task_token: token.0.clone(),
233                        details,
234                        identity: self.client.identity(),
235                        namespace: self.client.namespace(),
236                        resource_id: Default::default(),
237                    }
238                    .into_request(),
239                )
240                .await
241                .map_err(AsyncActivityError::from_status)?
242                .into_inner();
243                Ok(ActivityHeartbeatResponse::from(resp))
244            }
245            ActivityIdentifier::ById {
246                workflow_id,
247                run_id,
248                activity_id,
249            } => {
250                let resp = WorkflowService::record_activity_task_heartbeat_by_id(
251                    &mut self.client.clone(),
252                    RecordActivityTaskHeartbeatByIdRequest {
253                        namespace: self.client.namespace(),
254                        workflow_id: workflow_id.clone(),
255                        run_id: run_id.clone(),
256                        activity_id: activity_id.clone(),
257                        details,
258                        identity: self.client.identity(),
259                        resource_id: Default::default(),
260                    }
261                    .into_request(),
262                )
263                .await
264                .map_err(AsyncActivityError::from_status)?
265                .into_inner();
266                Ok(ActivityHeartbeatResponse::from(resp))
267            }
268        }
269    }
270}
271
272/// Response from a heartbeat call.
273#[derive(Debug, Clone)]
274pub struct ActivityHeartbeatResponse {
275    /// True if the activity has been asked to cancel itself.
276    pub cancel_requested: bool,
277    /// True if the activity is paused.
278    pub activity_paused: bool,
279    /// True if the activity was reset.
280    pub activity_reset: bool,
281}
282
283impl From<RecordActivityTaskHeartbeatResponse> for ActivityHeartbeatResponse {
284    fn from(resp: RecordActivityTaskHeartbeatResponse) -> Self {
285        Self {
286            cancel_requested: resp.cancel_requested,
287            activity_paused: resp.activity_paused,
288            activity_reset: resp.activity_reset,
289        }
290    }
291}
292
293impl From<RecordActivityTaskHeartbeatByIdResponse> for ActivityHeartbeatResponse {
294    fn from(resp: RecordActivityTaskHeartbeatByIdResponse) -> Self {
295        Self {
296            cancel_requested: resp.cancel_requested,
297            activity_paused: resp.activity_paused,
298            activity_reset: resp.activity_reset,
299        }
300    }
301}