pub struct ActivatedJob { /* private fields */ }Expand description
Represents an activated job
Implementations§
Source§impl ActivatedJob
impl ActivatedJob
Sourcepub fn key(&self) -> i64
pub fn key(&self) -> i64
Examples found in repository?
More examples
examples/pizza.rs (line 196)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 // Default configuration using Camunda's docker compose
77 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 // Wait until first OAuth token has been retrieved
94 client.auth_initialized().await;
95
96 // Deploy our pizza ordering process
97 // We just discard the result for brevity
98 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 // Let's define our shared state, how much stock we have
123 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 // This will share the stock among all the workers confirming orders
135 // counting it down for each order.
136 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 // We can reuse the worker builder for sharing some configurations
149 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 // Handler can be both function callbacks or closures
163 // Handlers have to share return type for this to work
164 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 //Same guy keeps leaving out his address for some reason
193 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}
250
251async fn confirm_order(
252 _client: Client,
253 job: ActivatedJob,
254 state: Arc<SharedState<Mutex<Stock>>>,
255) -> Result<OrderResult, WorkerError<()>> {
256 let order = job
257 .data::<Order>()
258 .expect("For demonstration purposes we're not handling a missing order here");
259 let mut stock: tokio::sync::MutexGuard<Stock> = state.lock().await;
260
261 let mut order_accepted = true;
262 let mut message = None;
263 for item in order.items {
264 if let Some(quantity) = stock.items.get_mut(&item) {
265 if *quantity > 0 {
266 *quantity -= 1;
267 } else {
268 message = Some(format!("We're out of stock of {}", item));
269 order_accepted = false;
270 }
271 } else {
272 message = Some(format!("We don't serve {}", item));
273 order_accepted = false;
274 }
275 }
276
277 println!("Confirmed order");
278 Ok(OrderResult {
279 order_accepted,
280 message,
281 })
282}
283
284async fn bake_pizzas(client: Client, job: ActivatedJob) {
285 let order = job
286 .data::<Order>()
287 .expect("We shouldn't start baking pizzas if there is no order!");
288
289 println!("Baking pizzas");
290 sleep(Duration::from_secs(10 * order.items.len() as u64)).await;
291 println!("Finished baking {} pizzas", order.items.len());
292
293 let _ = client.complete_job().with_job_key(job.key()).send().await;
294}Sourcepub fn process_instance_key(&self) -> i64
pub fn process_instance_key(&self) -> i64
Sourcepub fn bpmn_process_id(&self) -> &str
pub fn bpmn_process_id(&self) -> &str
Sourcepub fn process_definition_version(&self) -> i32
pub fn process_definition_version(&self) -> i32
Sourcepub fn process_definition_key(&self) -> i64
pub fn process_definition_key(&self) -> i64
Sourcepub fn element_id(&self) -> &str
pub fn element_id(&self) -> &str
Sourcepub fn element_instance_key(&self) -> i64
pub fn element_instance_key(&self) -> i64
The unique key identifying the associated task, unique within the scope of the process instance.
§Returns
i64- The element instance key.
Sourcepub fn custom_headers(&self) -> &str
pub fn custom_headers(&self) -> &str
A set of custom headers defined during modelling; returned as a serialized JSON document.
§Returns
&str- The custom headers as a JSON document.
Sourcepub fn retries(&self) -> i32
pub fn retries(&self) -> i32
The amount of retries left to this job (should always be positive).
§Returns
i32- The number of retries left.
Examples found in repository?
examples/pizza.rs (line 310)
296async fn deliver_order(
297 _client: Client,
298 job: ActivatedJob,
299) -> Result<(), WorkerError<serde_json::Value>> {
300 let data: Result<Customer, ClientError> = job.data();
301 let customer = data.unwrap();
302
303 if customer.address.is_empty() {
304 return Err(WorkerError::ThrowError {
305 error_code: String::from("invalid_address"),
306 error_message: Some(String::from("Missing address")),
307 });
308 }
309
310 if customer.bad_tipper && job.retries() > 1 {
311 println!("Customer is a bad tipper, let's delay their order");
312 sleep(Duration::from_secs(10)).await;
313
314 return Err(WorkerError::FailJobWithData {
315 error_message: String::from("Bad tipper, delaying delivery"),
316 data: json!({"apology": "Oops"}),
317 });
318 }
319 Ok(())
320}Sourcepub fn deadline(&self) -> i64
pub fn deadline(&self) -> i64
When the job can be activated again, sent as a UNIX epoch timestamp.
§Returns
i64- The deadline as a UNIX epoch timestamp.
Sourcepub fn variables(&self) -> &str
pub fn variables(&self) -> &str
JSON document, computed at activation time, consisting of all visible variables to the task scope.
§Returns
&str- The variables as a JSON document.
Sourcepub fn data<T: DeserializeOwned>(&self) -> Result<T, ClientError>
pub fn data<T: DeserializeOwned>(&self) -> Result<T, ClientError>
Deserializes the variables JSON document into a specified type.
§Type Parameters
T- The type to deserialize into.
§Returns
Result<T, ClientError>- The deserialized data or an error if deserialization fails.
Examples found in repository?
examples/pizza.rs (line 190)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 // Default configuration using Camunda's docker compose
77 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 // Wait until first OAuth token has been retrieved
94 client.auth_initialized().await;
95
96 // Deploy our pizza ordering process
97 // We just discard the result for brevity
98 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 // Let's define our shared state, how much stock we have
123 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 // This will share the stock among all the workers confirming orders
135 // counting it down for each order.
136 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 // We can reuse the worker builder for sharing some configurations
149 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 // Handler can be both function callbacks or closures
163 // Handlers have to share return type for this to work
164 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 //Same guy keeps leaving out his address for some reason
193 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}
250
251async fn confirm_order(
252 _client: Client,
253 job: ActivatedJob,
254 state: Arc<SharedState<Mutex<Stock>>>,
255) -> Result<OrderResult, WorkerError<()>> {
256 let order = job
257 .data::<Order>()
258 .expect("For demonstration purposes we're not handling a missing order here");
259 let mut stock: tokio::sync::MutexGuard<Stock> = state.lock().await;
260
261 let mut order_accepted = true;
262 let mut message = None;
263 for item in order.items {
264 if let Some(quantity) = stock.items.get_mut(&item) {
265 if *quantity > 0 {
266 *quantity -= 1;
267 } else {
268 message = Some(format!("We're out of stock of {}", item));
269 order_accepted = false;
270 }
271 } else {
272 message = Some(format!("We don't serve {}", item));
273 order_accepted = false;
274 }
275 }
276
277 println!("Confirmed order");
278 Ok(OrderResult {
279 order_accepted,
280 message,
281 })
282}
283
284async fn bake_pizzas(client: Client, job: ActivatedJob) {
285 let order = job
286 .data::<Order>()
287 .expect("We shouldn't start baking pizzas if there is no order!");
288
289 println!("Baking pizzas");
290 sleep(Duration::from_secs(10 * order.items.len() as u64)).await;
291 println!("Finished baking {} pizzas", order.items.len());
292
293 let _ = client.complete_job().with_job_key(job.key()).send().await;
294}
295
296async fn deliver_order(
297 _client: Client,
298 job: ActivatedJob,
299) -> Result<(), WorkerError<serde_json::Value>> {
300 let data: Result<Customer, ClientError> = job.data();
301 let customer = data.unwrap();
302
303 if customer.address.is_empty() {
304 return Err(WorkerError::ThrowError {
305 error_code: String::from("invalid_address"),
306 error_message: Some(String::from("Missing address")),
307 });
308 }
309
310 if customer.bad_tipper && job.retries() > 1 {
311 println!("Customer is a bad tipper, let's delay their order");
312 sleep(Duration::from_secs(10)).await;
313
314 return Err(WorkerError::FailJobWithData {
315 error_message: String::from("Bad tipper, delaying delivery"),
316 data: json!({"apology": "Oops"}),
317 });
318 }
319 Ok(())
320}Trait Implementations§
Source§impl Clone for ActivatedJob
impl Clone for ActivatedJob
Source§fn clone(&self) -> ActivatedJob
fn clone(&self) -> ActivatedJob
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreAuto Trait Implementations§
impl Freeze for ActivatedJob
impl RefUnwindSafe for ActivatedJob
impl Send for ActivatedJob
impl Sync for ActivatedJob
impl Unpin for ActivatedJob
impl UnwindSafe for ActivatedJob
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T in a tonic::Request