pizza/
pizza.rs

1use serde::{Deserialize, Serialize};
2use serde_json::json;
3use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
4use tokio::{sync::Mutex, time::sleep};
5use zeebe_rs::{ActivatedJob, Client, ClientError, SharedState, WorkerError};
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8struct Customer {
9    name: String,
10    address: String,
11    bad_tipper: bool,
12    customer_id: String,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16struct Order {
17    items: Vec<String>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21struct Stock {
22    items: HashMap<String, i32>,
23}
24
25#[derive(Debug, Clone, Serialize)]
26struct OrderResult {
27    order_accepted: bool,
28    message: Option<String>,
29}
30
31async fn place_order(
32    client: Client,
33    process_definition_key: i64,
34    name: &str,
35    address: &str,
36    bad_tipper: bool,
37    items: Vec<&str>,
38) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
39    let customer = Customer {
40        name: name.to_owned(),
41        address: address.to_owned(),
42        bad_tipper,
43        customer_id: format!("{}_{}", name, address),
44    };
45
46    let order = Order {
47        items: items.into_iter().map(|x| x.to_owned()).collect(),
48    };
49
50    let res = client
51        .create_process_instance()
52        .with_process_definition_key(process_definition_key)
53        .with_variables(customer.clone())?
54        .send()
55        .await?;
56
57    println!("{:?}", res);
58
59    let res = client
60        .publish_message()
61        .with_name(String::from("order_pizza_msg"))
62        .with_correlation_key(customer.customer_id.clone())
63        .with_variables(order)?
64        .send()
65        .await?;
66
67    println!("{:?}", res);
68
69    Ok(())
70}
71
72#[tokio::main]
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}
250
251async fn confirm_order(
252    _client: Client,
253    job: ActivatedJob,
254    state: Arc<SharedState<Mutex<Stock>>>,
255) -> Result<OrderResult, WorkerError<()>> {
256    let order = job
257        .data::<Order>()
258        .expect("For demonstration purposes we're not handling a missing order here");
259    let mut stock: tokio::sync::MutexGuard<Stock> = state.lock().await;
260
261    let mut order_accepted = true;
262    let mut message = None;
263    for item in order.items {
264        if let Some(quantity) = stock.items.get_mut(&item) {
265            if *quantity > 0 {
266                *quantity -= 1;
267            } else {
268                message = Some(format!("We're out of stock of {}", item));
269                order_accepted = false;
270            }
271        } else {
272            message = Some(format!("We don't serve {}", item));
273            order_accepted = false;
274        }
275    }
276
277    println!("Confirmed order");
278    Ok(OrderResult {
279        order_accepted,
280        message,
281    })
282}
283
284async fn bake_pizzas(client: Client, job: ActivatedJob) {
285    let order = job
286        .data::<Order>()
287        .expect("We shouldn't start baking pizzas if there is no order!");
288
289    println!("Baking pizzas");
290    sleep(Duration::from_secs(10 * order.items.len() as u64)).await;
291    println!("Finished baking {} pizzas", order.items.len());
292
293    let _ = client.complete_job().with_job_key(job.key()).send().await;
294}
295
296async fn deliver_order(
297    _client: Client,
298    job: ActivatedJob,
299) -> Result<(), WorkerError<serde_json::Value>> {
300    let data: Result<Customer, ClientError> = job.data();
301    let customer = data.unwrap();
302
303    if customer.address.is_empty() {
304        return Err(WorkerError::ThrowError {
305            error_code: String::from("invalid_address"),
306            error_message: Some(String::from("Missing address")),
307        });
308    }
309
310    if customer.bad_tipper && job.retries() > 1 {
311        println!("Customer is a bad tipper, let's delay their order");
312        sleep(Duration::from_secs(10)).await;
313
314        return Err(WorkerError::FailJobWithData {
315            error_message: String::from("Bad tipper, delaying delivery"),
316            data: json!({"apology": "Oops"}),
317        });
318    }
319    Ok(())
320}