pub struct ClientBuilder<S: ClientBuilderState> { /* private fields */ }Expand description
A builder for configuring and creating a Client.
The ClientBuilder allows you to configure various aspects of the client,
such as the endpoint, TLS settings, timeouts, and OAuth configuration.
Implementations§
Source§impl ClientBuilder<Initial>
impl ClientBuilder<Initial>
Sourcepub fn with_address(
self,
zeebe_address: &str,
port: u16,
) -> ClientBuilder<WithAddress>
pub fn with_address( self, zeebe_address: &str, port: u16, ) -> ClientBuilder<WithAddress>
Sets the endpoint for the Zeebe client.
§Arguments
zeebe_address- A string slice that holds the address of the Zeebe broker.port- A 16-bit unsigned integer that holds the port number of the Zeebe broker.
§Returns
A ClientBuilder<WithAddress> instance with the Zeebe endpoint set.
Examples found in repository?
More examples
examples/oauth.rs (line 13)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
12 let client = zeebe_rs::Client::builder()
13 .with_address("http://localhost", 26500)
14 .with_oauth(
15 String::from("zeebe"),
16 String::from("zecret"),
17 String::from(
18 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
19 ),
20 String::from("zeebe-api"),
21 Duration::from_secs(30),
22 None,
23 )
24 .build()
25 .await?;
26
27 let _ = client.auth_initialized().await;
28 let topology = client.topology().send().await;
29 println!("{:?}", topology);
30
31 Ok(())
32}examples/custom_auto_handler.rs (line 49)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46 // Default configuration using Camunda's docker compose
47 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48 let client = Client::builder()
49 .with_address("http://localhost", 26500)
50 .with_oauth(
51 String::from("zeebe"),
52 String::from("zecret"),
53 String::from(
54 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55 ),
56 String::from("zeebe-api"),
57 Duration::from_secs(30),
58 None,
59 )
60 .build()
61 .await?;
62
63 // Wait until first OAuth token has been retrieved
64 client.auth_initialized().await;
65
66 let worker = client
67 .worker()
68 .with_request_timeout(Duration::from_secs(1))
69 .with_job_timeout(Duration::from_secs(1))
70 .with_max_jobs_to_activate(1)
71 .with_concurrency_limit(1)
72 .with_job_type(String::from("placeholder"))
73 .with_handler(always_fail)
74 .build();
75
76 tokio::join!(worker.run());
77
78 Ok(())
79}examples/deploy_resource.rs (line 21)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20 let client = zeebe_rs::Client::builder()
21 .with_address("http://localhost", 26500)
22 .with_oauth(
23 String::from("zeebe"),
24 String::from("zecret"),
25 String::from(
26 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27 ),
28 String::from("zeebe-api"),
29 Duration::from_secs(30),
30 None,
31 )
32 .build()
33 .await?;
34
35 let _ = client.auth_initialized().await;
36 let result = client
37 .deploy_resource()
38 .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39 .read_resource_files()?
40 .send()
41 .await?;
42
43 println!("{:?}", result);
44
45 let result = client
46 .publish_message()
47 .with_name(String::from("hello_world"))
48 .without_correlation_key()
49 .with_variables(HelloWorld {
50 hello: String::from("foo"),
51 })?
52 .send()
53 .await?;
54
55 println!("{:?}", result);
56
57 sleep(Duration::from_secs(1)).await;
58
59 let result = client
60 .publish_message()
61 .with_name(String::from("hello_message"))
62 .with_correlation_key(String::from("foo"))
63 .send()
64 .await?;
65
66 println!("{:?}", result);
67
68 Ok(())
69}examples/pizza.rs (line 79)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 // Default configuration using Camunda's docker compose
77 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 // Wait until first OAuth token has been retrieved
94 client.auth_initialized().await;
95
96 // Deploy our pizza ordering process
97 // We just discard the result for brevity
98 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 // Let's define our shared state, how much stock we have
123 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 // This will share the stock among all the workers confirming orders
135 // counting it down for each order.
136 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 // We can reuse the worker builder for sharing some configurations
149 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 // Handler can be both function callbacks or closures
163 // Handlers have to share return type for this to work
164 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 //Same guy keeps leaving out his address for some reason
193 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}Source§impl ClientBuilder<WithAddress>
impl ClientBuilder<WithAddress>
Sourcepub fn with_oauth(
self,
client_id: String,
client_secret: String,
auth_url: String,
audience: String,
auth_timeout: Duration,
auth_type: Option<AuthType>,
) -> Self
pub fn with_oauth( self, client_id: String, client_secret: String, auth_url: String, audience: String, auth_timeout: Duration, auth_type: Option<AuthType>, ) -> Self
Configures OAuth authentication for the client.
§Arguments
client_id- The client ID for OAuth authentication.client_secret- The client secret for OAuth authentication.auth_url- The URL for the OAuth authentication server.audience- The audience for the OAuth token.auth_timeout- The timeout duration for the OAuth authentication process.auth_type- If OAuth credentials should use request body or basic auth, defaults to request body
§Returns
A ClientBuilder<WithAddress> instance with OAuth configuration set.
Examples found in repository?
examples/oauth.rs (lines 14-23)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
12 let client = zeebe_rs::Client::builder()
13 .with_address("http://localhost", 26500)
14 .with_oauth(
15 String::from("zeebe"),
16 String::from("zecret"),
17 String::from(
18 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
19 ),
20 String::from("zeebe-api"),
21 Duration::from_secs(30),
22 None,
23 )
24 .build()
25 .await?;
26
27 let _ = client.auth_initialized().await;
28 let topology = client.topology().send().await;
29 println!("{:?}", topology);
30
31 Ok(())
32}More examples
examples/custom_auto_handler.rs (lines 50-59)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46 // Default configuration using Camunda's docker compose
47 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48 let client = Client::builder()
49 .with_address("http://localhost", 26500)
50 .with_oauth(
51 String::from("zeebe"),
52 String::from("zecret"),
53 String::from(
54 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55 ),
56 String::from("zeebe-api"),
57 Duration::from_secs(30),
58 None,
59 )
60 .build()
61 .await?;
62
63 // Wait until first OAuth token has been retrieved
64 client.auth_initialized().await;
65
66 let worker = client
67 .worker()
68 .with_request_timeout(Duration::from_secs(1))
69 .with_job_timeout(Duration::from_secs(1))
70 .with_max_jobs_to_activate(1)
71 .with_concurrency_limit(1)
72 .with_job_type(String::from("placeholder"))
73 .with_handler(always_fail)
74 .build();
75
76 tokio::join!(worker.run());
77
78 Ok(())
79}examples/deploy_resource.rs (lines 22-31)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20 let client = zeebe_rs::Client::builder()
21 .with_address("http://localhost", 26500)
22 .with_oauth(
23 String::from("zeebe"),
24 String::from("zecret"),
25 String::from(
26 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27 ),
28 String::from("zeebe-api"),
29 Duration::from_secs(30),
30 None,
31 )
32 .build()
33 .await?;
34
35 let _ = client.auth_initialized().await;
36 let result = client
37 .deploy_resource()
38 .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39 .read_resource_files()?
40 .send()
41 .await?;
42
43 println!("{:?}", result);
44
45 let result = client
46 .publish_message()
47 .with_name(String::from("hello_world"))
48 .without_correlation_key()
49 .with_variables(HelloWorld {
50 hello: String::from("foo"),
51 })?
52 .send()
53 .await?;
54
55 println!("{:?}", result);
56
57 sleep(Duration::from_secs(1)).await;
58
59 let result = client
60 .publish_message()
61 .with_name(String::from("hello_message"))
62 .with_correlation_key(String::from("foo"))
63 .send()
64 .await?;
65
66 println!("{:?}", result);
67
68 Ok(())
69}examples/pizza.rs (lines 80-89)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 // Default configuration using Camunda's docker compose
77 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 // Wait until first OAuth token has been retrieved
94 client.auth_initialized().await;
95
96 // Deploy our pizza ordering process
97 // We just discard the result for brevity
98 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 // Let's define our shared state, how much stock we have
123 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 // This will share the stock among all the workers confirming orders
135 // counting it down for each order.
136 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 // We can reuse the worker builder for sharing some configurations
149 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 // Handler can be both function callbacks or closures
163 // Handlers have to share return type for this to work
164 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 //Same guy keeps leaving out his address for some reason
193 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}Sourcepub fn with_cloud_tls(self) -> Self
pub fn with_cloud_tls(self) -> Self
Configures TLS for the client root certificates required for Camunda Cloud
§Returns
A ClientBuilder<WithAddress> instance with tls configuration set.
Sourcepub fn with_tls(self, pem: &Path) -> Result<Self, ClientBuilderError>
pub fn with_tls(self, pem: &Path) -> Result<Self, ClientBuilderError>
Sourcepub async fn build(self) -> Result<Client, ClientBuilderError>
pub async fn build(self) -> Result<Client, ClientBuilderError>
Builds the client with the configured settings.
§Returns
A Result containing either a Client instance or a ClientBuilderError if the client
could not be built.
Examples found in repository?
More examples
examples/oauth.rs (line 24)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
12 let client = zeebe_rs::Client::builder()
13 .with_address("http://localhost", 26500)
14 .with_oauth(
15 String::from("zeebe"),
16 String::from("zecret"),
17 String::from(
18 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
19 ),
20 String::from("zeebe-api"),
21 Duration::from_secs(30),
22 None,
23 )
24 .build()
25 .await?;
26
27 let _ = client.auth_initialized().await;
28 let topology = client.topology().send().await;
29 println!("{:?}", topology);
30
31 Ok(())
32}examples/custom_auto_handler.rs (line 60)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46 // Default configuration using Camunda's docker compose
47 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48 let client = Client::builder()
49 .with_address("http://localhost", 26500)
50 .with_oauth(
51 String::from("zeebe"),
52 String::from("zecret"),
53 String::from(
54 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55 ),
56 String::from("zeebe-api"),
57 Duration::from_secs(30),
58 None,
59 )
60 .build()
61 .await?;
62
63 // Wait until first OAuth token has been retrieved
64 client.auth_initialized().await;
65
66 let worker = client
67 .worker()
68 .with_request_timeout(Duration::from_secs(1))
69 .with_job_timeout(Duration::from_secs(1))
70 .with_max_jobs_to_activate(1)
71 .with_concurrency_limit(1)
72 .with_job_type(String::from("placeholder"))
73 .with_handler(always_fail)
74 .build();
75
76 tokio::join!(worker.run());
77
78 Ok(())
79}examples/deploy_resource.rs (line 32)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
19
20 let client = zeebe_rs::Client::builder()
21 .with_address("http://localhost", 26500)
22 .with_oauth(
23 String::from("zeebe"),
24 String::from("zecret"),
25 String::from(
26 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
27 ),
28 String::from("zeebe-api"),
29 Duration::from_secs(30),
30 None,
31 )
32 .build()
33 .await?;
34
35 let _ = client.auth_initialized().await;
36 let result = client
37 .deploy_resource()
38 .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
39 .read_resource_files()?
40 .send()
41 .await?;
42
43 println!("{:?}", result);
44
45 let result = client
46 .publish_message()
47 .with_name(String::from("hello_world"))
48 .without_correlation_key()
49 .with_variables(HelloWorld {
50 hello: String::from("foo"),
51 })?
52 .send()
53 .await?;
54
55 println!("{:?}", result);
56
57 sleep(Duration::from_secs(1)).await;
58
59 let result = client
60 .publish_message()
61 .with_name(String::from("hello_message"))
62 .with_correlation_key(String::from("foo"))
63 .send()
64 .await?;
65
66 println!("{:?}", result);
67
68 Ok(())
69}examples/pizza.rs (line 90)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 // Default configuration using Camunda's docker compose
77 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 // Wait until first OAuth token has been retrieved
94 client.auth_initialized().await;
95
96 // Deploy our pizza ordering process
97 // We just discard the result for brevity
98 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 // Let's define our shared state, how much stock we have
123 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 // This will share the stock among all the workers confirming orders
135 // counting it down for each order.
136 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 // We can reuse the worker builder for sharing some configurations
149 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 // Handler can be both function callbacks or closures
163 // Handlers have to share return type for this to work
164 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 //Same guy keeps leaving out his address for some reason
193 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}Sourcepub fn with_timeout(self, timeout: Duration) -> Self
pub fn with_timeout(self, timeout: Duration) -> Self
Sourcepub fn with_keep_alive(self, keep_alive: Duration) -> Self
pub fn with_keep_alive(self, keep_alive: Duration) -> Self
Trait Implementations§
Source§impl<S: Clone + ClientBuilderState> Clone for ClientBuilder<S>
impl<S: Clone + ClientBuilderState> Clone for ClientBuilder<S>
Source§fn clone(&self) -> ClientBuilder<S>
fn clone(&self) -> ClientBuilder<S>
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreSource§impl<S: Debug + ClientBuilderState> Debug for ClientBuilder<S>
impl<S: Debug + ClientBuilderState> Debug for ClientBuilder<S>
Auto Trait Implementations§
impl<S> Freeze for ClientBuilder<S>
impl<S> RefUnwindSafe for ClientBuilder<S>where
S: RefUnwindSafe,
impl<S> Send for ClientBuilder<S>where
S: Send,
impl<S> Sync for ClientBuilder<S>where
S: Sync,
impl<S> Unpin for ClientBuilder<S>where
S: Unpin,
impl<S> UnwindSafe for ClientBuilder<S>where
S: UnwindSafe,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T in a tonic::Request