zeebe/
job.rs

1use crate::{client::Client, proto, Error, Result};
2use tracing::{debug, trace};
3
4/// An activate Zeebe job that is ready to be worked on by a worker.
5#[derive(Clone, Debug)]
6pub struct Job(proto::ActivatedJob);
7
8impl Job {
9    /// Create a new job from a GRPC response
10    pub(crate) fn new(proto: proto::ActivatedJob) -> Self {
11        Job(proto)
12    }
13
14    /// the key, a unique identifier for the job
15    pub fn key(&self) -> i64 {
16        self.0.key
17    }
18
19    /// the type of the job (should match what was requested)
20    pub fn job_type(&self) -> &str {
21        &self.0.r#type
22    }
23
24    /// the job's process instance key
25    pub fn process_instance_key(&self) -> i64 {
26        self.0.process_instance_key
27    }
28
29    /// the bpmn process ID of the job process definition
30    pub fn bpmn_process_id(&self) -> &str {
31        &self.0.bpmn_process_id
32    }
33
34    /// the version of the job process definition
35    pub fn process_definition_version(&self) -> i32 {
36        self.0.process_definition_version
37    }
38
39    /// the key of the job process definition
40    pub fn process_definition_key(&self) -> i64 {
41        self.0.process_definition_key
42    }
43
44    /// the associated task element ID
45    pub fn element_id(&self) -> &str {
46        &self.0.element_id
47    }
48
49    /// the unique key identifying the associated task, unique within the scope of
50    /// the process instance
51    pub fn element_instance_key(&self) -> i64 {
52        self.0.element_instance_key
53    }
54
55    /// a set of custom headers defined during modelling; returned as a serialized
56    /// JSON document
57    pub fn custom_headers(&self) -> &str {
58        &self.0.custom_headers
59    }
60
61    /// the name of the worker which activated this job
62    pub fn worker(&self) -> &str {
63        &self.0.worker
64    }
65
66    /// the amount of retries left to this job (should always be positive)
67    pub fn retries(&self) -> i32 {
68        self.0.retries
69    }
70
71    /// when the job can be activated again, sent as a UNIX epoch timestamp
72    pub fn deadline(&self) -> i64 {
73        self.0.deadline
74    }
75
76    /// Serialized JSON document, computed at activation time, consisting of all
77    /// visible variables to the task scope
78    pub fn variables_str(&self) -> &str {
79        &self.0.variables
80    }
81
82    /// JSON document, computed at activation time, consisting of all visible
83    /// variables to the task scope
84    pub fn variables(&self) -> serde_json::Value {
85        serde_json::from_str(&self.0.variables).unwrap_or_else(|_| serde_json::json!({}))
86    }
87
88    /// Deserialize encoded json variables as a given type
89    pub fn variables_as<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Option<T> {
90        serde_json::from_str::<'a, T>(&self.0.variables).ok()
91    }
92}
93
94/// Configuration to complete a job
95#[derive(Debug)]
96pub struct CompleteJobBuilder {
97    client: Client,
98    job_key: Option<i64>,
99    variables: Option<serde_json::Value>,
100}
101
102impl CompleteJobBuilder {
103    /// Create a new complete job builder.
104    pub fn new(client: Client) -> Self {
105        CompleteJobBuilder {
106            client,
107            job_key: None,
108            variables: None,
109        }
110    }
111
112    /// Set the unique job identifier, as obtained from [`Job::key`].
113    pub fn with_job_key(self, job_key: i64) -> Self {
114        CompleteJobBuilder {
115            job_key: Some(job_key),
116            ..self
117        }
118    }
119
120    /// Set the JSON document representing the variables in the current task scope.
121    pub fn with_variables<T: Into<serde_json::Value>>(self, variables: T) -> Self {
122        CompleteJobBuilder {
123            variables: Some(variables.into()),
124            ..self
125        }
126    }
127
128    /// Submit the complete job request.
129    #[tracing::instrument(skip(self), name = "complete_job", err)]
130    pub async fn send(mut self) -> Result<CompleteJobResponse> {
131        if self.job_key.is_none() && self.client.current_job_key.is_none() {
132            return Err(Error::InvalidParameters("`job_key` must be set"));
133        }
134        let req = proto::CompleteJobRequest {
135            job_key: self.job_key.or(self.client.current_job_key).unwrap(),
136            variables: self
137                .variables
138                .map_or(String::new(), |vars| vars.to_string()),
139        };
140
141        debug!(job_key = req.job_key, "completing job:");
142        trace!(?req, "request:");
143        let res = self
144            .client
145            .gateway_client
146            .complete_job(tonic::Request::new(req))
147            .await?;
148
149        Ok(CompleteJobResponse(res.into_inner()))
150    }
151}
152
153/// Completed job instance data.
154#[derive(Debug)]
155pub struct CompleteJobResponse(proto::CompleteJobResponse);
156
157/// Configuration to fail a job
158#[derive(Debug)]
159pub struct FailJobBuilder {
160    client: Client,
161    job_key: Option<i64>,
162    retries: Option<u32>,
163    error_message: Option<String>,
164}
165
166impl FailJobBuilder {
167    /// Create a new fail job builder.
168    pub fn new(client: Client) -> Self {
169        FailJobBuilder {
170            client,
171            job_key: None,
172            retries: None,
173            error_message: None,
174        }
175    }
176
177    /// Set the unique job identifier, as obtained from [`Job::key`].
178    pub fn with_job_key(self, job_key: i64) -> Self {
179        FailJobBuilder {
180            job_key: Some(job_key),
181            ..self
182        }
183    }
184
185    /// Set the amount of retries the job should have left.
186    pub fn with_retries(self, retries: u32) -> Self {
187        FailJobBuilder {
188            retries: Some(retries),
189            ..self
190        }
191    }
192
193    /// Set an optional message describing why the job failed. This is particularly
194    /// useful if a job runs out of retries and an incident is raised, as it this
195    /// message can help explain why an incident was raised.
196    pub fn with_error_message<T: Into<String>>(self, error_message: T) -> Self {
197        FailJobBuilder {
198            error_message: Some(error_message.into()),
199            ..self
200        }
201    }
202
203    /// Submit the fail job request.
204    #[tracing::instrument(skip(self), name = "fail_job", err)]
205    pub async fn send(mut self) -> Result<FailJobResponse> {
206        if self.job_key.is_none() && self.client.current_job_key.is_none() {
207            return Err(Error::InvalidParameters("`job_key` must be set"));
208        }
209        let req = proto::FailJobRequest {
210            job_key: self.job_key.or(self.client.current_job_key).unwrap(),
211            retries: self.retries.unwrap_or_default() as i32,
212            error_message: self.error_message.unwrap_or_default(),
213        };
214
215        debug!(job_key = req.job_key, "failing job:");
216        trace!(?req, "request:");
217        let res = self
218            .client
219            .gateway_client
220            .fail_job(tonic::Request::new(req))
221            .await?;
222
223        Ok(FailJobResponse(res.into_inner()))
224    }
225}
226
227/// Failed job instance data.
228#[derive(Debug)]
229pub struct FailJobResponse(proto::FailJobResponse);
230
231/// Configuration to throw an error in the context of a job.
232#[derive(Debug)]
233pub struct ThrowErrorBuilder {
234    client: Client,
235    job_key: Option<i64>,
236    error_code: Option<String>,
237    error_message: Option<String>,
238}
239
240impl ThrowErrorBuilder {
241    /// Create a new throw error builder.
242    pub fn new(client: Client) -> Self {
243        ThrowErrorBuilder {
244            client,
245            job_key: None,
246            error_code: None,
247            error_message: None,
248        }
249    }
250
251    /// Set the unique job identifier, as obtained from [`Job::key`].
252    pub fn with_job_key(self, job_key: i64) -> Self {
253        ThrowErrorBuilder {
254            job_key: Some(job_key),
255            ..self
256        }
257    }
258
259    /// Set the error code that will be matched with an error catch event.
260    pub fn with_error_code<T: Into<String>>(self, error_code: T) -> Self {
261        ThrowErrorBuilder {
262            error_code: Some(error_code.into()),
263            ..self
264        }
265    }
266
267    /// Set an optional message describing why the error was thrown.
268    pub fn with_error_message<T: Into<String>>(self, error_message: T) -> Self {
269        ThrowErrorBuilder {
270            error_message: Some(error_message.into()),
271            ..self
272        }
273    }
274
275    /// Submit the throw error request.
276    #[tracing::instrument(skip(self), name = "throw_error", err)]
277    pub async fn send(mut self) -> Result<ThrowErrorResponse> {
278        if self.job_key.is_none() && self.client.current_job_key.is_none() {
279            return Err(Error::InvalidParameters("`job_key` must be set"));
280        }
281        let req = proto::ThrowErrorRequest {
282            job_key: self.job_key.or(self.client.current_job_key).unwrap(),
283            error_code: self.error_code.unwrap_or_default(),
284            error_message: self.error_message.unwrap_or_default(),
285        };
286
287        debug!(?req, "sending request:");
288        let res = self
289            .client
290            .gateway_client
291            .throw_error(tonic::Request::new(req))
292            .await?;
293
294        Ok(ThrowErrorResponse(res.into_inner()))
295    }
296}
297
298/// Throw error response data.
299#[derive(Debug)]
300pub struct ThrowErrorResponse(proto::ThrowErrorResponse);
301
302/// Updates the number of retries a job has left. This is mostly useful for jobs
303/// that have run out of retries, should the underlying problem be solved.
304#[derive(Debug)]
305pub struct UpdateJobRetriesBuilder {
306    client: Client,
307    job_key: Option<i64>,
308    retries: Option<u32>,
309}
310
311impl UpdateJobRetriesBuilder {
312    /// Create a new update retries builder.
313    pub fn new(client: Client) -> Self {
314        UpdateJobRetriesBuilder {
315            client,
316            job_key: None,
317            retries: None,
318        }
319    }
320
321    /// Set the unique job identifier, as obtained from [`Job::key`].
322    pub fn with_job_key(self, job_key: i64) -> Self {
323        UpdateJobRetriesBuilder {
324            job_key: Some(job_key),
325            ..self
326        }
327    }
328
329    /// Set the new amount of retries for the job
330    pub fn with_retries(self, retries: u32) -> Self {
331        UpdateJobRetriesBuilder {
332            retries: Some(retries),
333            ..self
334        }
335    }
336
337    /// Submit the update job retries request.
338    #[tracing::instrument(skip(self), name = "update_job_retries", err)]
339    pub async fn send(mut self) -> Result<UpdateJobRetriesResponse> {
340        if (self.job_key.is_none() && self.client.current_job_key.is_none())
341            || self.retries.is_none()
342        {
343            return Err(Error::InvalidParameters(
344                "`job_key` and `retries` must be set",
345            ));
346        }
347        let req = proto::UpdateJobRetriesRequest {
348            job_key: self.job_key.or(self.client.current_job_key).unwrap(),
349            retries: self.retries.unwrap() as i32,
350        };
351
352        debug!(?req, "sending request:");
353        let res = self
354            .client
355            .gateway_client
356            .update_job_retries(tonic::Request::new(req))
357            .await?;
358
359        Ok(UpdateJobRetriesResponse(res.into_inner()))
360    }
361}
362
363/// Update job retries data.
364#[derive(Debug)]
365pub struct UpdateJobRetriesResponse(proto::UpdateJobRetriesResponse);