ClientBuilder

Struct ClientBuilder 

Source
pub struct ClientBuilder<S: ClientBuilderState> { /* private fields */ }
Expand description

A builder for configuring and creating a Client.

The ClientBuilder allows you to configure various aspects of the client, such as the endpoint, TLS settings, timeouts, and OAuth configuration.

Implementations§

Source§

impl ClientBuilder<Initial>

Source

pub fn with_address( self, zeebe_address: &str, port: u16, ) -> ClientBuilder<WithAddress>

Sets the endpoint for the Zeebe client.

§Arguments
  • zeebe_address - A string slice that holds the address of the Zeebe broker.
  • port - A 16-bit unsigned integer that holds the port number of the Zeebe broker.
§Returns

A ClientBuilder<WithAddress> instance with the Zeebe endpoint set.

Examples found in repository?
examples/hello_client.rs (line 4)
2async fn main() -> Result<(), Box<dyn std::error::Error>> {
3    let client = zeebe_rs::Client::builder()
4        .with_address("http://localhost", 26500)
5        .build()
6        .await?;
7
8    let topology = client.topology().send().await;
9    println!("{:?}", topology);
10
11    Ok(())
12}
More examples
Hide additional examples
examples/oauth.rs (line 13)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
12    let client = zeebe_rs::Client::builder()
13        .with_address("http://localhost", 26500)
14        .with_oauth(
15            String::from("zeebe"),
16            String::from("zecret"),
17            String::from(
18                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
19            ),
20            String::from("zeebe-api"),
21            Duration::from_secs(30),
22            None,
23        )
24        .build()
25        .await?;
26
27    let _ = client.auth_initialized().await;
28    let topology = client.topology().send().await;
29    println!("{:?}", topology);
30
31    Ok(())
32}
examples/custom_auto_handler.rs (line 49)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46    // Default configuration using Camunda's docker compose
47    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48    let client = Client::builder()
49        .with_address("http://localhost", 26500)
50        .with_oauth(
51            String::from("zeebe"),
52            String::from("zecret"),
53            String::from(
54                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55            ),
56            String::from("zeebe-api"),
57            Duration::from_secs(30),
58            None,
59        )
60        .build()
61        .await?;
62
63    // Wait until first OAuth token has been retrieved
64    client.auth_initialized().await;
65
66    let worker = client
67        .worker()
68        .with_request_timeout(Duration::from_secs(1))
69        .with_job_timeout(Duration::from_secs(1))
70        .with_max_jobs_to_activate(1)
71        .with_concurrency_limit(1)
72        .with_job_type(String::from("placeholder"))
73        .with_handler(always_fail)
74        .build();
75
76    tokio::join!(worker.run());
77
78    Ok(())
79}
examples/deploy_resource.rs (line 21)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20    let client = zeebe_rs::Client::builder()
21        .with_address("http://localhost", 26500)
22        .with_oauth(
23            String::from("zeebe"),
24            String::from("zecret"),
25            String::from(
26                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27            ),
28            String::from("zeebe-api"),
29            Duration::from_secs(30),
30            None,
31        )
32        .build()
33        .await?;
34
35    let _ = client.auth_initialized().await;
36    let result = client
37        .deploy_resource()
38        .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39        .read_resource_files()?
40        .send()
41        .await?;
42
43    println!("{:?}", result);
44
45    let result = client
46        .publish_message()
47        .with_name(String::from("hello_world"))
48        .without_correlation_key()
49        .with_variables(HelloWorld {
50            hello: String::from("foo"),
51        })?
52        .send()
53        .await?;
54
55    println!("{:?}", result);
56
57    sleep(Duration::from_secs(1)).await;
58
59    let result = client
60        .publish_message()
61        .with_name(String::from("hello_message"))
62        .with_correlation_key(String::from("foo"))
63        .send()
64        .await?;
65
66    println!("{:?}", result);
67
68    Ok(())
69}
examples/pizza.rs (line 79)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}
Source§

impl ClientBuilder<WithAddress>

Source

pub fn with_oauth( self, client_id: String, client_secret: String, auth_url: String, audience: String, auth_timeout: Duration, auth_type: Option<AuthType>, ) -> Self

Configures OAuth authentication for the client.

§Arguments
  • client_id - The client ID for OAuth authentication.
  • client_secret - The client secret for OAuth authentication.
  • auth_url - The URL for the OAuth authentication server.
  • audience - The audience for the OAuth token.
  • auth_timeout - The timeout duration for the OAuth authentication process.
  • auth_type - If OAuth credentials should use request body or basic auth, defaults to request body
§Returns

A ClientBuilder<WithAddress> instance with OAuth configuration set.

Examples found in repository?
examples/oauth.rs (lines 14-23)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
12    let client = zeebe_rs::Client::builder()
13        .with_address("http://localhost", 26500)
14        .with_oauth(
15            String::from("zeebe"),
16            String::from("zecret"),
17            String::from(
18                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
19            ),
20            String::from("zeebe-api"),
21            Duration::from_secs(30),
22            None,
23        )
24        .build()
25        .await?;
26
27    let _ = client.auth_initialized().await;
28    let topology = client.topology().send().await;
29    println!("{:?}", topology);
30
31    Ok(())
32}
More examples
Hide additional examples
examples/custom_auto_handler.rs (lines 50-59)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46    // Default configuration using Camunda's docker compose
47    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48    let client = Client::builder()
49        .with_address("http://localhost", 26500)
50        .with_oauth(
51            String::from("zeebe"),
52            String::from("zecret"),
53            String::from(
54                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55            ),
56            String::from("zeebe-api"),
57            Duration::from_secs(30),
58            None,
59        )
60        .build()
61        .await?;
62
63    // Wait until first OAuth token has been retrieved
64    client.auth_initialized().await;
65
66    let worker = client
67        .worker()
68        .with_request_timeout(Duration::from_secs(1))
69        .with_job_timeout(Duration::from_secs(1))
70        .with_max_jobs_to_activate(1)
71        .with_concurrency_limit(1)
72        .with_job_type(String::from("placeholder"))
73        .with_handler(always_fail)
74        .build();
75
76    tokio::join!(worker.run());
77
78    Ok(())
79}
examples/deploy_resource.rs (lines 22-31)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20    let client = zeebe_rs::Client::builder()
21        .with_address("http://localhost", 26500)
22        .with_oauth(
23            String::from("zeebe"),
24            String::from("zecret"),
25            String::from(
26                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27            ),
28            String::from("zeebe-api"),
29            Duration::from_secs(30),
30            None,
31        )
32        .build()
33        .await?;
34
35    let _ = client.auth_initialized().await;
36    let result = client
37        .deploy_resource()
38        .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39        .read_resource_files()?
40        .send()
41        .await?;
42
43    println!("{:?}", result);
44
45    let result = client
46        .publish_message()
47        .with_name(String::from("hello_world"))
48        .without_correlation_key()
49        .with_variables(HelloWorld {
50            hello: String::from("foo"),
51        })?
52        .send()
53        .await?;
54
55    println!("{:?}", result);
56
57    sleep(Duration::from_secs(1)).await;
58
59    let result = client
60        .publish_message()
61        .with_name(String::from("hello_message"))
62        .with_correlation_key(String::from("foo"))
63        .send()
64        .await?;
65
66    println!("{:?}", result);
67
68    Ok(())
69}
examples/pizza.rs (lines 80-89)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}
Source

pub fn with_cloud_tls(self) -> Self

Configures TLS for the client root certificates required for Camunda Cloud

§Returns

A ClientBuilder<WithAddress> instance with tls configuration set.

Source

pub fn with_tls(self, pem: &Path) -> Result<Self, ClientBuilderError>

Configures TLS for the client using a PEM file.

§Arguments
  • pem - The path to the PEM file containing the TLS certificate.
§Returns

A Result containing either a ClientBuilder<WithAddress> instance with TLS configuration set, or a ClientBuilderError if reading the PEM file fails.

Source

pub async fn build(self) -> Result<Client, ClientBuilderError>

Builds the client with the configured settings.

§Returns

A Result containing either a Client instance or a ClientBuilderError if the client could not be built.

Examples found in repository?
examples/hello_client.rs (line 5)
2async fn main() -> Result<(), Box<dyn std::error::Error>> {
3    let client = zeebe_rs::Client::builder()
4        .with_address("http://localhost", 26500)
5        .build()
6        .await?;
7
8    let topology = client.topology().send().await;
9    println!("{:?}", topology);
10
11    Ok(())
12}
More examples
Hide additional examples
examples/oauth.rs (line 24)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
12    let client = zeebe_rs::Client::builder()
13        .with_address("http://localhost", 26500)
14        .with_oauth(
15            String::from("zeebe"),
16            String::from("zecret"),
17            String::from(
18                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
19            ),
20            String::from("zeebe-api"),
21            Duration::from_secs(30),
22            None,
23        )
24        .build()
25        .await?;
26
27    let _ = client.auth_initialized().await;
28    let topology = client.topology().send().await;
29    println!("{:?}", topology);
30
31    Ok(())
32}
examples/custom_auto_handler.rs (line 60)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46    // Default configuration using Camunda's docker compose
47    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48    let client = Client::builder()
49        .with_address("http://localhost", 26500)
50        .with_oauth(
51            String::from("zeebe"),
52            String::from("zecret"),
53            String::from(
54                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55            ),
56            String::from("zeebe-api"),
57            Duration::from_secs(30),
58            None,
59        )
60        .build()
61        .await?;
62
63    // Wait until first OAuth token has been retrieved
64    client.auth_initialized().await;
65
66    let worker = client
67        .worker()
68        .with_request_timeout(Duration::from_secs(1))
69        .with_job_timeout(Duration::from_secs(1))
70        .with_max_jobs_to_activate(1)
71        .with_concurrency_limit(1)
72        .with_job_type(String::from("placeholder"))
73        .with_handler(always_fail)
74        .build();
75
76    tokio::join!(worker.run());
77
78    Ok(())
79}
examples/deploy_resource.rs (line 32)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20    let client = zeebe_rs::Client::builder()
21        .with_address("http://localhost", 26500)
22        .with_oauth(
23            String::from("zeebe"),
24            String::from("zecret"),
25            String::from(
26                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27            ),
28            String::from("zeebe-api"),
29            Duration::from_secs(30),
30            None,
31        )
32        .build()
33        .await?;
34
35    let _ = client.auth_initialized().await;
36    let result = client
37        .deploy_resource()
38        .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39        .read_resource_files()?
40        .send()
41        .await?;
42
43    println!("{:?}", result);
44
45    let result = client
46        .publish_message()
47        .with_name(String::from("hello_world"))
48        .without_correlation_key()
49        .with_variables(HelloWorld {
50            hello: String::from("foo"),
51        })?
52        .send()
53        .await?;
54
55    println!("{:?}", result);
56
57    sleep(Duration::from_secs(1)).await;
58
59    let result = client
60        .publish_message()
61        .with_name(String::from("hello_message"))
62        .with_correlation_key(String::from("foo"))
63        .send()
64        .await?;
65
66    println!("{:?}", result);
67
68    Ok(())
69}
examples/pizza.rs (line 90)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}
Source

pub fn with_timeout(self, timeout: Duration) -> Self

Sets the timeout duration for the client.

§Arguments
  • timeout - The timeout duration.
§Returns

A ClientBuilder<WithAddress> instance with the timeout configuration set.

Source

pub fn with_keep_alive(self, keep_alive: Duration) -> Self

Sets the keep-alive duration for the client.

§Arguments
  • keep_alive - The keep-alive duration.
§Returns

A ClientBuilder<WithAddress> instance with the keep-alive configuration set.

Trait Implementations§

Source§

impl<S: Clone + ClientBuilderState> Clone for ClientBuilder<S>

Source§

fn clone(&self) -> ClientBuilder<S>

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<S: Debug + ClientBuilderState> Debug for ClientBuilder<S>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<S: ClientBuilderState + Default> Default for ClientBuilder<S>

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl<S> Freeze for ClientBuilder<S>

§

impl<S> RefUnwindSafe for ClientBuilder<S>
where S: RefUnwindSafe,

§

impl<S> Send for ClientBuilder<S>
where S: Send,

§

impl<S> Sync for ClientBuilder<S>
where S: Sync,

§

impl<S> Unpin for ClientBuilder<S>
where S: Unpin,

§

impl<S> UnwindSafe for ClientBuilder<S>
where S: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,