Client

Struct Client 

Source
pub struct Client { /* private fields */ }
Expand description

A client for interacting with the Zeebe cluster.

The Client struct provides methods to create various requests and operations on the Zeebe cluster, such as deploying resources, managing process instances, handling jobs, and more.

§Examples

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = zeebe_rs::Client::builder()
        .with_address("http://localhost", 26500)
        .build()
        .await?;

   let topology = client.topology().send().await;

   Ok(())
}

§Notes

Each method returns a request builder that can be further configured and then sent to the Zeebe cluster. The requests are asynchronous and return futures that need to be awaited.

Implementations§

Source§

impl Client

Source

pub fn builder() -> ClientBuilder<Initial>

Creates a new ClientBuilder instance for configuring and building a Client.

The ClientBuilder allows you to set various configurations such as the endpoint, TLS settings, timeouts, and OAuth configuration before building the Client.

§Examples
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = zeebe_rs::Client::builder()
        .with_address("http://localhost", 26500)
        .build()
        .await?;

    let topology = client.topology().send().await;

    Ok(())
}
Examples found in repository?
examples/hello_client.rs (line 3)
2async fn main() -> Result<(), Box<dyn std::error::Error>> {
3    let client = zeebe_rs::Client::builder()
4        .with_address("http://localhost", 26500)
5        .build()
6        .await?;
7
8    let topology = client.topology().send().await;
9    println!("{:?}", topology);
10
11    Ok(())
12}
More examples
Hide additional examples
examples/oauth.rs (line 12)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
12    let client = zeebe_rs::Client::builder()
13        .with_address("http://localhost", 26500)
14        .with_oauth(
15            String::from("zeebe"),
16            String::from("zecret"),
17            String::from(
18                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
19            ),
20            String::from("zeebe-api"),
21            Duration::from_secs(30),
22            None,
23        )
24        .build()
25        .await?;
26
27    let _ = client.auth_initialized().await;
28    let topology = client.topology().send().await;
29    println!("{:?}", topology);
30
31    Ok(())
32}
examples/custom_auto_handler.rs (line 48)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46    // Default configuration using Camunda's docker compose
47    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48    let client = Client::builder()
49        .with_address("http://localhost", 26500)
50        .with_oauth(
51            String::from("zeebe"),
52            String::from("zecret"),
53            String::from(
54                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55            ),
56            String::from("zeebe-api"),
57            Duration::from_secs(30),
58            None,
59        )
60        .build()
61        .await?;
62
63    // Wait until first OAuth token has been retrieved
64    client.auth_initialized().await;
65
66    let worker = client
67        .worker()
68        .with_request_timeout(Duration::from_secs(1))
69        .with_job_timeout(Duration::from_secs(1))
70        .with_max_jobs_to_activate(1)
71        .with_concurrency_limit(1)
72        .with_job_type(String::from("placeholder"))
73        .with_handler(always_fail)
74        .build();
75
76    tokio::join!(worker.run());
77
78    Ok(())
79}
examples/deploy_resource.rs (line 20)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20    let client = zeebe_rs::Client::builder()
21        .with_address("http://localhost", 26500)
22        .with_oauth(
23            String::from("zeebe"),
24            String::from("zecret"),
25            String::from(
26                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27            ),
28            String::from("zeebe-api"),
29            Duration::from_secs(30),
30            None,
31        )
32        .build()
33        .await?;
34
35    let _ = client.auth_initialized().await;
36    let result = client
37        .deploy_resource()
38        .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39        .read_resource_files()?
40        .send()
41        .await?;
42
43    println!("{:?}", result);
44
45    let result = client
46        .publish_message()
47        .with_name(String::from("hello_world"))
48        .without_correlation_key()
49        .with_variables(HelloWorld {
50            hello: String::from("foo"),
51        })?
52        .send()
53        .await?;
54
55    println!("{:?}", result);
56
57    sleep(Duration::from_secs(1)).await;
58
59    let result = client
60        .publish_message()
61        .with_name(String::from("hello_message"))
62        .with_correlation_key(String::from("foo"))
63        .send()
64        .await?;
65
66    println!("{:?}", result);
67
68    Ok(())
69}
examples/pizza.rs (line 78)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}
Source

pub async fn auth_initialized(&self)

Waits for the first OAuth token to be fetched before returning. Returns instantly if OAuth is not enabled.

§Examples
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = zeebe_rs::Client::builder()
        // Configure client with OAuth...
       .build()
       .await?;

    // Await first OAuth token before proceeding
    let _ = client.auth_initialized().await;
    
    // Fetch topology after acquiring OAuth token
    let topology = client.topology().send().await;

   Ok(())
}
Examples found in repository?
examples/oauth.rs (line 27)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
12    let client = zeebe_rs::Client::builder()
13        .with_address("http://localhost", 26500)
14        .with_oauth(
15            String::from("zeebe"),
16            String::from("zecret"),
17            String::from(
18                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
19            ),
20            String::from("zeebe-api"),
21            Duration::from_secs(30),
22            None,
23        )
24        .build()
25        .await?;
26
27    let _ = client.auth_initialized().await;
28    let topology = client.topology().send().await;
29    println!("{:?}", topology);
30
31    Ok(())
32}
More examples
Hide additional examples
examples/custom_auto_handler.rs (line 64)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46    // Default configuration using Camunda's docker compose
47    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48    let client = Client::builder()
49        .with_address("http://localhost", 26500)
50        .with_oauth(
51            String::from("zeebe"),
52            String::from("zecret"),
53            String::from(
54                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55            ),
56            String::from("zeebe-api"),
57            Duration::from_secs(30),
58            None,
59        )
60        .build()
61        .await?;
62
63    // Wait until first OAuth token has been retrieved
64    client.auth_initialized().await;
65
66    let worker = client
67        .worker()
68        .with_request_timeout(Duration::from_secs(1))
69        .with_job_timeout(Duration::from_secs(1))
70        .with_max_jobs_to_activate(1)
71        .with_concurrency_limit(1)
72        .with_job_type(String::from("placeholder"))
73        .with_handler(always_fail)
74        .build();
75
76    tokio::join!(worker.run());
77
78    Ok(())
79}
examples/deploy_resource.rs (line 35)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20    let client = zeebe_rs::Client::builder()
21        .with_address("http://localhost", 26500)
22        .with_oauth(
23            String::from("zeebe"),
24            String::from("zecret"),
25            String::from(
26                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27            ),
28            String::from("zeebe-api"),
29            Duration::from_secs(30),
30            None,
31        )
32        .build()
33        .await?;
34
35    let _ = client.auth_initialized().await;
36    let result = client
37        .deploy_resource()
38        .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39        .read_resource_files()?
40        .send()
41        .await?;
42
43    println!("{:?}", result);
44
45    let result = client
46        .publish_message()
47        .with_name(String::from("hello_world"))
48        .without_correlation_key()
49        .with_variables(HelloWorld {
50            hello: String::from("foo"),
51        })?
52        .send()
53        .await?;
54
55    println!("{:?}", result);
56
57    sleep(Duration::from_secs(1)).await;
58
59    let result = client
60        .publish_message()
61        .with_name(String::from("hello_message"))
62        .with_correlation_key(String::from("foo"))
63        .send()
64        .await?;
65
66    println!("{:?}", result);
67
68    Ok(())
69}
examples/pizza.rs (line 94)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}
Source

pub fn topology(&self) -> TopologyRequest

Creates a TopologyRequest to build a request for fetching the toplogy of the Zeebe cluster.

§Examples
let topology = client.topology().send().await;
Examples found in repository?
examples/hello_client.rs (line 8)
2async fn main() -> Result<(), Box<dyn std::error::Error>> {
3    let client = zeebe_rs::Client::builder()
4        .with_address("http://localhost", 26500)
5        .build()
6        .await?;
7
8    let topology = client.topology().send().await;
9    println!("{:?}", topology);
10
11    Ok(())
12}
More examples
Hide additional examples
examples/oauth.rs (line 28)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
12    let client = zeebe_rs::Client::builder()
13        .with_address("http://localhost", 26500)
14        .with_oauth(
15            String::from("zeebe"),
16            String::from("zecret"),
17            String::from(
18                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
19            ),
20            String::from("zeebe-api"),
21            Duration::from_secs(30),
22            None,
23        )
24        .build()
25        .await?;
26
27    let _ = client.auth_initialized().await;
28    let topology = client.topology().send().await;
29    println!("{:?}", topology);
30
31    Ok(())
32}
Source

pub fn deploy_resource(&self) -> DeployResourceRequest<Initial>

Creates a DeployResourceRequest to build a request for deploying a resource to Zeebe.

§Examples
 let result = client
    .deploy_resource()
    .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
    .read_resource_files()?
    .send()
    .await?;
Examples found in repository?
examples/deploy_resource.rs (line 37)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20    let client = zeebe_rs::Client::builder()
21        .with_address("http://localhost", 26500)
22        .with_oauth(
23            String::from("zeebe"),
24            String::from("zecret"),
25            String::from(
26                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27            ),
28            String::from("zeebe-api"),
29            Duration::from_secs(30),
30            None,
31        )
32        .build()
33        .await?;
34
35    let _ = client.auth_initialized().await;
36    let result = client
37        .deploy_resource()
38        .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39        .read_resource_files()?
40        .send()
41        .await?;
42
43    println!("{:?}", result);
44
45    let result = client
46        .publish_message()
47        .with_name(String::from("hello_world"))
48        .without_correlation_key()
49        .with_variables(HelloWorld {
50            hello: String::from("foo"),
51        })?
52        .send()
53        .await?;
54
55    println!("{:?}", result);
56
57    sleep(Duration::from_secs(1)).await;
58
59    let result = client
60        .publish_message()
61        .with_name(String::from("hello_message"))
62        .with_correlation_key(String::from("foo"))
63        .send()
64        .await?;
65
66    println!("{:?}", result);
67
68    Ok(())
69}
More examples
Hide additional examples
examples/pizza.rs (line 99)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}
Source

pub fn delete_resource(&self) -> DeleteResourceRequest<Initial>

Creates a DeleteResourceRequest to build a request for deleting a deployed resource in Zeebe.

§Examples
let response = client
    .delete_resource()
    .with_resource_key(12345)
    .send()
    .await?;
Source

pub fn create_process_instance(&self) -> CreateProcessInstanceRequest<Initial>

Creates a CreateProcessInstanceRequest to build a request for creating a process instance in Zeebe.

§Examples
// Create a process instance with a BPMN process ID and no input variables
client
    .create_process_instance()
    .with_bpmn_process_id(String::from("order-process"))
    .without_input()
    .send()
    .await?;

// Create a process instance with a process definition key and input variables
client
    .create_process_instance()
    .with_process_definition_key(12345)
    .with_variables(json!({"orderId": 123}))
    .unwrap()
    .send()
    .await?;
Examples found in repository?
examples/pizza.rs (line 51)
31async fn place_order(
32    client: Client,
33    process_definition_key: i64,
34    name: &str,
35    address: &str,
36    bad_tipper: bool,
37    items: Vec<&str>,
38) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
39    let customer = Customer {
40        name: name.to_owned(),
41        address: address.to_owned(),
42        bad_tipper,
43        customer_id: format!("{}_{}", name, address),
44    };
45
46    let order = Order {
47        items: items.into_iter().map(|x| x.to_owned()).collect(),
48    };
49
50    let res = client
51        .create_process_instance()
52        .with_process_definition_key(process_definition_key)
53        .with_variables(customer.clone())?
54        .send()
55        .await?;
56
57    println!("{:?}", res);
58
59    let res = client
60        .publish_message()
61        .with_name(String::from("order_pizza_msg"))
62        .with_correlation_key(customer.customer_id.clone())
63        .with_variables(order)?
64        .send()
65        .await?;
66
67    println!("{:?}", res);
68
69    Ok(())
70}
Source

pub fn cancel_process_instance(&self) -> CancelProcessInstanceRequest<Initial>

Creates a CancelProcessInstanceRequest to cancel an active process instance in Zeebe.

§Examples
client
    .cancel_process_instance()
    .with_process_instance_key(123456)
    .send()
    .await?;
Source

pub fn migrate_process_instance(&self) -> MigrateProcessInstanceRequest<Initial>

Creates a MigrateProcessInstanceRequest to migrate a running process instance in Zeebe.

§Examples
client
    .migrate_process_instance()
    .with_process_instance_key(12356)
    .without_migration_plan()
    .send()
    .await?;
Source

pub fn modify_process_instance(&self) -> ModifyProcessInstanceRequest<Initial>

Creates a ModifyProcessInstanceRequest to modify a running process instance in Zeebe.

§Examples
client
    .modify_process_instance()
    .with_process_instance_key(12345)
        .with_activate_instruction("element_id".to_string(), 67890)
        .with_variable_instruction("scope_id".to_string(), serde_json::json!({"key": "value"}))?
        .build()
    .with_terminate_instruction(54321)
    .with_operation_reference(98765)
    .send()
    .await?;
Source

pub fn set_variables(&self) -> SetVariablesRequest<Initial>

Creates a SetVariablesRequest to update variables for a particular scope.

§Examples

#[derive(Serialize)]
struct Foo {
    bar: String
}

client
    .set_variables()
    .with_element_instance_key(123456)
    .with_variable(Foo {bar: String::from("foobar")})
    .send()
    .await?;
Source

pub fn publish_message(&self) -> PublishMessageRequest<Initial>

Creates a PublishMessageRequest to publish a message to a running process instance in Zeebe.

§Examples
// Publish a message without a correlation key
client
    .publish_message()
    .with_name(String::from("hello_world"))
    .without_correlation_key()
    .with_variables(HelloWorld {
           hello: String::from("foo"),
     })?
     .send()
     .await?;

// Publish a message with a correlation key
client
    .publish_message()
    .with_name(String::from("hello_message"))
    .with_correlation_key(String::from("foo"))
    .send()
    .await?;
Examples found in repository?
examples/pizza.rs (line 60)
31async fn place_order(
32    client: Client,
33    process_definition_key: i64,
34    name: &str,
35    address: &str,
36    bad_tipper: bool,
37    items: Vec<&str>,
38) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
39    let customer = Customer {
40        name: name.to_owned(),
41        address: address.to_owned(),
42        bad_tipper,
43        customer_id: format!("{}_{}", name, address),
44    };
45
46    let order = Order {
47        items: items.into_iter().map(|x| x.to_owned()).collect(),
48    };
49
50    let res = client
51        .create_process_instance()
52        .with_process_definition_key(process_definition_key)
53        .with_variables(customer.clone())?
54        .send()
55        .await?;
56
57    println!("{:?}", res);
58
59    let res = client
60        .publish_message()
61        .with_name(String::from("order_pizza_msg"))
62        .with_correlation_key(customer.customer_id.clone())
63        .with_variables(order)?
64        .send()
65        .await?;
66
67    println!("{:?}", res);
68
69    Ok(())
70}
More examples
Hide additional examples
examples/deploy_resource.rs (line 46)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20    let client = zeebe_rs::Client::builder()
21        .with_address("http://localhost", 26500)
22        .with_oauth(
23            String::from("zeebe"),
24            String::from("zecret"),
25            String::from(
26                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27            ),
28            String::from("zeebe-api"),
29            Duration::from_secs(30),
30            None,
31        )
32        .build()
33        .await?;
34
35    let _ = client.auth_initialized().await;
36    let result = client
37        .deploy_resource()
38        .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39        .read_resource_files()?
40        .send()
41        .await?;
42
43    println!("{:?}", result);
44
45    let result = client
46        .publish_message()
47        .with_name(String::from("hello_world"))
48        .without_correlation_key()
49        .with_variables(HelloWorld {
50            hello: String::from("foo"),
51        })?
52        .send()
53        .await?;
54
55    println!("{:?}", result);
56
57    sleep(Duration::from_secs(1)).await;
58
59    let result = client
60        .publish_message()
61        .with_name(String::from("hello_message"))
62        .with_correlation_key(String::from("foo"))
63        .send()
64        .await?;
65
66    println!("{:?}", result);
67
68    Ok(())
69}
Source

pub fn broadcast_signal(&self) -> BroadcastSignalRequest<Initial>

Creates a BroadcastSignalRequest to publish a signal to Zeebe.

§Examples
client
    .broadcast_signal()
    .with_signal_name(String::from("Hello_Signal"))
    .send()
    .await?;
Source

pub fn resolve_incident(&self) -> ResolveIncidentRequest<Initial>

Creates a ResolveIncidentRequest to resolve an active incident in Zeebe.

§Examples
client
    .resolve_incident()
    .with_incident_key(123456)
    .send()
    .await?;
Source

pub fn throw_error(&self) -> ThrowErrorRequest<Initial>

Creates a ThrowErrorRequest to throw a business error.

§Examples
client
    .throw_error()
    .with_job_key(123456)
    .with_error_code(String::from("error_code"))
    .send()
    .await?;
Source

pub fn evaluate_decision(&self) -> EvaluateDecisionRequest<Initial>

Creates a EvaluateDecisionRequest to request the evaluation of a DMN decision.

§Examples
client
    .evaluate_decision()
    .with_decision_key(123456)
    .with_decision_id(String::from("decision_id"))
    .send()
    .await?;
Source

pub fn complete_job(&self) -> CompleteJobRequest<Initial>

Creates a CompleteJobRequest to complete a job for a process instance in Zeebe.

§Examples
client
    .complete_job()
    .with_job_key(123456)
    .send()
    .await?;
Examples found in repository?
examples/pizza.rs (line 195)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}
250
251async fn confirm_order(
252    _client: Client,
253    job: ActivatedJob,
254    state: Arc<SharedState<Mutex<Stock>>>,
255) -> Result<OrderResult, WorkerError<()>> {
256    let order = job
257        .data::<Order>()
258        .expect("For demonstration purposes we're not handling a missing order here");
259    let mut stock: tokio::sync::MutexGuard<Stock> = state.lock().await;
260
261    let mut order_accepted = true;
262    let mut message = None;
263    for item in order.items {
264        if let Some(quantity) = stock.items.get_mut(&item) {
265            if *quantity > 0 {
266                *quantity -= 1;
267            } else {
268                message = Some(format!("We're out of stock of {}", item));
269                order_accepted = false;
270            }
271        } else {
272            message = Some(format!("We don't serve {}", item));
273            order_accepted = false;
274        }
275    }
276
277    println!("Confirmed order");
278    Ok(OrderResult {
279        order_accepted,
280        message,
281    })
282}
283
284async fn bake_pizzas(client: Client, job: ActivatedJob) {
285    let order = job
286        .data::<Order>()
287        .expect("We shouldn't start baking pizzas if there is no order!");
288
289    println!("Baking pizzas");
290    sleep(Duration::from_secs(10 * order.items.len() as u64)).await;
291    println!("Finished baking {} pizzas", order.items.len());
292
293    let _ = client.complete_job().with_job_key(job.key()).send().await;
294}
Source

pub fn fail_job(&self) -> FailJobRequest<Initial>

Creates a FailJobRequest to fail a job for a process instance in Zeebe.

§Examples
client
    .fail_job()
    .with_job_key(123456)
    .send()
    .await?;
Examples found in repository?
examples/custom_auto_handler.rs (line 32)
28    async fn handle_result(self, client: Client, job: ActivatedJob) {
29        match *self {
30            Ok(_) => unreachable!("This will always fail!"),
31            Err(_) => {
32                let _ = client.fail_job().with_job_key(job.key()).send().await;
33            }
34        }
35    }
Source

pub fn update_job_timeout(&self) -> UpdateJobTimeoutRequest<Initial>

Creates a UpdateJobTimeoutRequest to update the timeout for a running job in Zeebe.

§Examples
client
    .update_job_timeout()
    .with_job_key(123456)
    .with_timeout(Duration::from_secs(10))
    .send()
    .await?;
Source

pub fn update_job_retries(&self) -> UpdateJobRetriesRequest<Initial>

Creates a UpdateJobRetriesRequest that updates the number of retries for a job in Zeebe.

§Examples
client
    .update_job_retries()
    .with_job_key(123456)
    .with_retries(1)
    .send()
    .await?;
Source

pub fn worker(&self) -> WorkerBuilder<Initial>

Creates a WorkerBuilder to build a worker for processing Zeebe jobs.

§Examples
client
    .worker()
    .with_job_type(String::from("example-service"))
    .with_job_timeout(Duration::from_secs(5 * 60))
    .with_request_timeout(Duration::from_secs(10))
    .with_max_jobs_to_activate(4)
    .with_concurrency_limit(2)
    .with_handler(|client, job| async move {
       let _ = client.complete_job().with_job_key(job.key()).send().await;
   })
   .build()
   .run()
   .await;
Examples found in repository?
examples/custom_auto_handler.rs (line 67)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46    // Default configuration using Camunda's docker compose
47    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48    let client = Client::builder()
49        .with_address("http://localhost", 26500)
50        .with_oauth(
51            String::from("zeebe"),
52            String::from("zecret"),
53            String::from(
54                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55            ),
56            String::from("zeebe-api"),
57            Duration::from_secs(30),
58            None,
59        )
60        .build()
61        .await?;
62
63    // Wait until first OAuth token has been retrieved
64    client.auth_initialized().await;
65
66    let worker = client
67        .worker()
68        .with_request_timeout(Duration::from_secs(1))
69        .with_job_timeout(Duration::from_secs(1))
70        .with_max_jobs_to_activate(1)
71        .with_concurrency_limit(1)
72        .with_job_type(String::from("placeholder"))
73        .with_handler(always_fail)
74        .build();
75
76    tokio::join!(worker.run());
77
78    Ok(())
79}
More examples
Hide additional examples
examples/pizza.rs (line 137)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}

Trait Implementations§

Source§

impl Clone for Client

Source§

fn clone(&self) -> Client

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Client

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl !Freeze for Client

§

impl !RefUnwindSafe for Client

§

impl Send for Client

§

impl Sync for Client

§

impl Unpin for Client

§

impl !UnwindSafe for Client

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,