1use serde::{Deserialize, Serialize};
2use serde_json::json;
3use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
4use tokio::{sync::Mutex, time::sleep};
5use zeebe_rs::{ActivatedJob, Client, ClientError, SharedState, WorkerError};
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8struct Customer {
9 name: String,
10 address: String,
11 bad_tipper: bool,
12 customer_id: String,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16struct Order {
17 items: Vec<String>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21struct Stock {
22 items: HashMap<String, i32>,
23}
24
25#[derive(Debug, Clone, Serialize)]
26struct OrderResult {
27 order_accepted: bool,
28 message: Option<String>,
29}
30
31async fn place_order(
32 client: Client,
33 process_definition_key: i64,
34 name: &str,
35 address: &str,
36 bad_tipper: bool,
37 items: Vec<&str>,
38) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
39 let customer = Customer {
40 name: name.to_owned(),
41 address: address.to_owned(),
42 bad_tipper,
43 customer_id: format!("{}_{}", name, address),
44 };
45
46 let order = Order {
47 items: items.into_iter().map(|x| x.to_owned()).collect(),
48 };
49
50 let res = client
51 .create_process_instance()
52 .with_process_definition_key(process_definition_key)
53 .with_variables(customer.clone())?
54 .send()
55 .await?;
56
57 println!("{:?}", res);
58
59 let res = client
60 .publish_message()
61 .with_name(String::from("order_pizza_msg"))
62 .with_correlation_key(customer.customer_id.clone())
63 .with_variables(order)?
64 .send()
65 .await?;
66
67 println!("{:?}", res);
68
69 Ok(())
70}
71
72#[tokio::main]
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 client.auth_initialized().await;
95
96 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}
250
251async fn confirm_order(
252 _client: Client,
253 job: ActivatedJob,
254 state: Arc<SharedState<Mutex<Stock>>>,
255) -> Result<OrderResult, WorkerError<()>> {
256 let order = job
257 .data::<Order>()
258 .expect("For demonstration purposes we're not handling a missing order here");
259 let mut stock: tokio::sync::MutexGuard<Stock> = state.lock().await;
260
261 let mut order_accepted = true;
262 let mut message = None;
263 for item in order.items {
264 if let Some(quantity) = stock.items.get_mut(&item) {
265 if *quantity > 0 {
266 *quantity -= 1;
267 } else {
268 message = Some(format!("We're out of stock of {}", item));
269 order_accepted = false;
270 }
271 } else {
272 message = Some(format!("We don't serve {}", item));
273 order_accepted = false;
274 }
275 }
276
277 println!("Confirmed order");
278 Ok(OrderResult {
279 order_accepted,
280 message,
281 })
282}
283
284async fn bake_pizzas(client: Client, job: ActivatedJob) {
285 let order = job
286 .data::<Order>()
287 .expect("We shouldn't start baking pizzas if there is no order!");
288
289 println!("Baking pizzas");
290 sleep(Duration::from_secs(10 * order.items.len() as u64)).await;
291 println!("Finished baking {} pizzas", order.items.len());
292
293 let _ = client.complete_job().with_job_key(job.key()).send().await;
294}
295
296async fn deliver_order(
297 _client: Client,
298 job: ActivatedJob,
299) -> Result<(), WorkerError<serde_json::Value>> {
300 let data: Result<Customer, ClientError> = job.data();
301 let customer = data.unwrap();
302
303 if customer.address.is_empty() {
304 return Err(WorkerError::ThrowError {
305 error_code: String::from("invalid_address"),
306 error_message: Some(String::from("Missing address")),
307 });
308 }
309
310 if customer.bad_tipper && job.retries() > 1 {
311 println!("Customer is a bad tipper, let's delay their order");
312 sleep(Duration::from_secs(10)).await;
313
314 return Err(WorkerError::FailJobWithData {
315 error_message: String::from("Bad tipper, delaying delivery"),
316 data: json!({"apology": "Oops"}),
317 });
318 }
319 Ok(())
320}