1use crate::{client::Client, proto, Error, Result};
2use tracing::{debug, trace};
3
4#[derive(Clone, Debug)]
6pub struct Job(proto::ActivatedJob);
7
8impl Job {
9 pub(crate) fn new(proto: proto::ActivatedJob) -> Self {
11 Job(proto)
12 }
13
14 pub fn key(&self) -> i64 {
16 self.0.key
17 }
18
19 pub fn job_type(&self) -> &str {
21 &self.0.r#type
22 }
23
24 pub fn process_instance_key(&self) -> i64 {
26 self.0.process_instance_key
27 }
28
29 pub fn bpmn_process_id(&self) -> &str {
31 &self.0.bpmn_process_id
32 }
33
34 pub fn process_definition_version(&self) -> i32 {
36 self.0.process_definition_version
37 }
38
39 pub fn process_definition_key(&self) -> i64 {
41 self.0.process_definition_key
42 }
43
44 pub fn element_id(&self) -> &str {
46 &self.0.element_id
47 }
48
49 pub fn element_instance_key(&self) -> i64 {
52 self.0.element_instance_key
53 }
54
55 pub fn custom_headers(&self) -> &str {
58 &self.0.custom_headers
59 }
60
61 pub fn worker(&self) -> &str {
63 &self.0.worker
64 }
65
66 pub fn retries(&self) -> i32 {
68 self.0.retries
69 }
70
71 pub fn deadline(&self) -> i64 {
73 self.0.deadline
74 }
75
76 pub fn variables_str(&self) -> &str {
79 &self.0.variables
80 }
81
82 pub fn variables(&self) -> serde_json::Value {
85 serde_json::from_str(&self.0.variables).unwrap_or_else(|_| serde_json::json!({}))
86 }
87
88 pub fn variables_as<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Option<T> {
90 serde_json::from_str::<'a, T>(&self.0.variables).ok()
91 }
92}
93
94#[derive(Debug)]
96pub struct CompleteJobBuilder {
97 client: Client,
98 job_key: Option<i64>,
99 variables: Option<serde_json::Value>,
100}
101
102impl CompleteJobBuilder {
103 pub fn new(client: Client) -> Self {
105 CompleteJobBuilder {
106 client,
107 job_key: None,
108 variables: None,
109 }
110 }
111
112 pub fn with_job_key(self, job_key: i64) -> Self {
114 CompleteJobBuilder {
115 job_key: Some(job_key),
116 ..self
117 }
118 }
119
120 pub fn with_variables<T: Into<serde_json::Value>>(self, variables: T) -> Self {
122 CompleteJobBuilder {
123 variables: Some(variables.into()),
124 ..self
125 }
126 }
127
128 #[tracing::instrument(skip(self), name = "complete_job", err)]
130 pub async fn send(mut self) -> Result<CompleteJobResponse> {
131 if self.job_key.is_none() && self.client.current_job_key.is_none() {
132 return Err(Error::InvalidParameters("`job_key` must be set"));
133 }
134 let req = proto::CompleteJobRequest {
135 job_key: self.job_key.or(self.client.current_job_key).unwrap(),
136 variables: self
137 .variables
138 .map_or(String::new(), |vars| vars.to_string()),
139 };
140
141 debug!(job_key = req.job_key, "completing job:");
142 trace!(?req, "request:");
143 let res = self
144 .client
145 .gateway_client
146 .complete_job(tonic::Request::new(req))
147 .await?;
148
149 Ok(CompleteJobResponse(res.into_inner()))
150 }
151}
152
153#[derive(Debug)]
155pub struct CompleteJobResponse(proto::CompleteJobResponse);
156
157#[derive(Debug)]
159pub struct FailJobBuilder {
160 client: Client,
161 job_key: Option<i64>,
162 retries: Option<u32>,
163 error_message: Option<String>,
164}
165
166impl FailJobBuilder {
167 pub fn new(client: Client) -> Self {
169 FailJobBuilder {
170 client,
171 job_key: None,
172 retries: None,
173 error_message: None,
174 }
175 }
176
177 pub fn with_job_key(self, job_key: i64) -> Self {
179 FailJobBuilder {
180 job_key: Some(job_key),
181 ..self
182 }
183 }
184
185 pub fn with_retries(self, retries: u32) -> Self {
187 FailJobBuilder {
188 retries: Some(retries),
189 ..self
190 }
191 }
192
193 pub fn with_error_message<T: Into<String>>(self, error_message: T) -> Self {
197 FailJobBuilder {
198 error_message: Some(error_message.into()),
199 ..self
200 }
201 }
202
203 #[tracing::instrument(skip(self), name = "fail_job", err)]
205 pub async fn send(mut self) -> Result<FailJobResponse> {
206 if self.job_key.is_none() && self.client.current_job_key.is_none() {
207 return Err(Error::InvalidParameters("`job_key` must be set"));
208 }
209 let req = proto::FailJobRequest {
210 job_key: self.job_key.or(self.client.current_job_key).unwrap(),
211 retries: self.retries.unwrap_or_default() as i32,
212 error_message: self.error_message.unwrap_or_default(),
213 };
214
215 debug!(job_key = req.job_key, "failing job:");
216 trace!(?req, "request:");
217 let res = self
218 .client
219 .gateway_client
220 .fail_job(tonic::Request::new(req))
221 .await?;
222
223 Ok(FailJobResponse(res.into_inner()))
224 }
225}
226
227#[derive(Debug)]
229pub struct FailJobResponse(proto::FailJobResponse);
230
231#[derive(Debug)]
233pub struct ThrowErrorBuilder {
234 client: Client,
235 job_key: Option<i64>,
236 error_code: Option<String>,
237 error_message: Option<String>,
238}
239
240impl ThrowErrorBuilder {
241 pub fn new(client: Client) -> Self {
243 ThrowErrorBuilder {
244 client,
245 job_key: None,
246 error_code: None,
247 error_message: None,
248 }
249 }
250
251 pub fn with_job_key(self, job_key: i64) -> Self {
253 ThrowErrorBuilder {
254 job_key: Some(job_key),
255 ..self
256 }
257 }
258
259 pub fn with_error_code<T: Into<String>>(self, error_code: T) -> Self {
261 ThrowErrorBuilder {
262 error_code: Some(error_code.into()),
263 ..self
264 }
265 }
266
267 pub fn with_error_message<T: Into<String>>(self, error_message: T) -> Self {
269 ThrowErrorBuilder {
270 error_message: Some(error_message.into()),
271 ..self
272 }
273 }
274
275 #[tracing::instrument(skip(self), name = "throw_error", err)]
277 pub async fn send(mut self) -> Result<ThrowErrorResponse> {
278 if self.job_key.is_none() && self.client.current_job_key.is_none() {
279 return Err(Error::InvalidParameters("`job_key` must be set"));
280 }
281 let req = proto::ThrowErrorRequest {
282 job_key: self.job_key.or(self.client.current_job_key).unwrap(),
283 error_code: self.error_code.unwrap_or_default(),
284 error_message: self.error_message.unwrap_or_default(),
285 };
286
287 debug!(?req, "sending request:");
288 let res = self
289 .client
290 .gateway_client
291 .throw_error(tonic::Request::new(req))
292 .await?;
293
294 Ok(ThrowErrorResponse(res.into_inner()))
295 }
296}
297
298#[derive(Debug)]
300pub struct ThrowErrorResponse(proto::ThrowErrorResponse);
301
302#[derive(Debug)]
305pub struct UpdateJobRetriesBuilder {
306 client: Client,
307 job_key: Option<i64>,
308 retries: Option<u32>,
309}
310
311impl UpdateJobRetriesBuilder {
312 pub fn new(client: Client) -> Self {
314 UpdateJobRetriesBuilder {
315 client,
316 job_key: None,
317 retries: None,
318 }
319 }
320
321 pub fn with_job_key(self, job_key: i64) -> Self {
323 UpdateJobRetriesBuilder {
324 job_key: Some(job_key),
325 ..self
326 }
327 }
328
329 pub fn with_retries(self, retries: u32) -> Self {
331 UpdateJobRetriesBuilder {
332 retries: Some(retries),
333 ..self
334 }
335 }
336
337 #[tracing::instrument(skip(self), name = "update_job_retries", err)]
339 pub async fn send(mut self) -> Result<UpdateJobRetriesResponse> {
340 if (self.job_key.is_none() && self.client.current_job_key.is_none())
341 || self.retries.is_none()
342 {
343 return Err(Error::InvalidParameters(
344 "`job_key` and `retries` must be set",
345 ));
346 }
347 let req = proto::UpdateJobRetriesRequest {
348 job_key: self.job_key.or(self.client.current_job_key).unwrap(),
349 retries: self.retries.unwrap() as i32,
350 };
351
352 debug!(?req, "sending request:");
353 let res = self
354 .client
355 .gateway_client
356 .update_job_retries(tonic::Request::new(req))
357 .await?;
358
359 Ok(UpdateJobRetriesResponse(res.into_inner()))
360 }
361}
362
363#[derive(Debug)]
365pub struct UpdateJobRetriesResponse(proto::UpdateJobRetriesResponse);