zeebe/client.rs
1use crate::oauth::AuthInterceptor;
2use crate::{
3 error::{Error, Result},
4 job::{CompleteJobBuilder, FailJobBuilder, ThrowErrorBuilder, UpdateJobRetriesBuilder},
5 oauth::OAuthConfig,
6 process::{
7 CancelProcessInstanceBuilder, CreateProcessInstanceBuilder,
8 CreateProcessInstanceWithResultBuilder, DeployProcessBuilder, SetVariablesBuilder,
9 },
10 proto::gateway_client::GatewayClient,
11 topology::TopologyBuilder,
12 util::{PublishMessageBuilder, ResolveIncidentBuilder},
13 worker::{auto_handler::Extensions, JobWorkerBuilder},
14};
15use std::env;
16use std::fmt::Debug;
17use std::fs;
18use std::rc::Rc;
19use std::time::Duration;
20use tonic::codegen::InterceptedService;
21use tonic::transport::{Certificate, Channel, ClientTlsConfig};
22
23const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
24const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(45);
25const CA_CERTIFICATE_PATH: &str = "ZEEBE_CA_CERTIFICATE_PATH";
26const ADDRESS: &str = "ZEEBE_ADDRESS";
27const HOST: &str = "ZEEBE_HOST";
28const DEFAULT_ADDRESS_HOST: &str = "http://127.0.0.1";
29const PORT: &str = "ZEEBE_PORT";
30const DEFAULT_ADDRESS_PORT: &str = "26500";
31
32/// Client used to communicate with Zeebe.
33#[derive(Clone, Debug)]
34pub struct Client {
35 pub(crate) gateway_client: GatewayClient<InterceptedService<Channel, AuthInterceptor>>,
36 pub(crate) auth_interceptor: AuthInterceptor,
37 pub(crate) current_job_key: Option<i64>,
38 pub(crate) current_job_extensions: Option<Rc<Extensions>>,
39}
40
41impl Default for Client {
42 fn default() -> Self {
43 Client::from_config(ClientConfig::default()).unwrap()
44 }
45}
46
47impl Client {
48 /// Create a new client with default config.
49 pub fn new() -> Self {
50 Client::default()
51 }
52
53 /// Create a new client from environment variables
54 pub fn from_env() -> Result<Self> {
55 Client::from_config(ClientConfig::from_env()?)
56 }
57
58 /// Build a new Zeebe client from a given configuration.
59 ///
60 /// # Examples
61 ///
62 /// ```
63 /// use zeebe::{Client, ClientConfig};
64 /// # #[tokio::main]
65 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
66 /// let endpoints = vec!["http://0.0.0.0:26500".to_string()];
67 ///
68 /// let client = Client::from_config(ClientConfig::with_endpoints(endpoints));
69 /// # Ok(())
70 /// # }
71 /// ```
72 ///
73 /// with TLS (see [the ClientTlsConfig docs] for configuration):
74 ///
75 /// [the ClientTlsConfig docs]: tonic::transport::ClientTlsConfig
76 ///
77 /// ```
78 /// use zeebe::{Client, ClientConfig};
79 /// use tonic::transport::ClientTlsConfig;
80 ///
81 /// # #[tokio::main]
82 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
83 /// let endpoints = vec!["http://0.0.0.0:26500".to_string()];
84 /// let tls = ClientTlsConfig::new();
85 ///
86 /// let client = Client::from_config(ClientConfig {
87 /// endpoints,
88 /// tls: Some(tls),
89 /// auth: None,
90 /// })?;
91 /// # Ok(())
92 /// # }
93 /// ```
94 pub fn from_config(config: ClientConfig) -> Result<Self> {
95 let channel = Self::build_channel(config.endpoints, config.tls)?;
96
97 let auth_interceptor = if let Some(auth_config) = config.auth {
98 AuthInterceptor::init(auth_config)?
99 } else {
100 AuthInterceptor::default()
101 };
102
103 let gateway_client = GatewayClient::with_interceptor(channel, auth_interceptor.clone());
104
105 Ok(Client {
106 gateway_client,
107 auth_interceptor,
108 current_job_key: None,
109 current_job_extensions: None,
110 })
111 }
112
113 /// Future that resolves when the auth interceptor is initialized.
114 pub async fn auth_initialized(&self) -> Result<()> {
115 self.auth_interceptor.auth_initialized().await
116 }
117
118 /// Obtains the current topology of the cluster the gateway is part of.
119 ///
120 /// # Examples
121 ///
122 /// ```no_run
123 /// # #[tokio::main]
124 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
125 /// let client = zeebe::Client::new();
126 ///
127 /// let topology = client.topology().send().await?;
128 /// # Ok(())
129 /// # }
130 /// ```
131 pub fn topology(&self) -> TopologyBuilder {
132 TopologyBuilder::new(self.clone())
133 }
134
135 /// Deploys one or more processes to Zeebe. Note that this is an atomic call,
136 /// i.e. either all processes are deployed, or none of them are.
137 ///
138 /// # Examples
139 ///
140 /// ```no_run
141 /// # #[tokio::main]
142 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
143 /// let client = zeebe::Client::new();
144 ///
145 /// let process = client
146 /// .deploy_process()
147 /// .with_resource_file("path/to/process.bpmn")
148 /// .send()
149 /// .await?;
150 /// # Ok(())
151 /// # }
152 pub fn deploy_process(&self) -> DeployProcessBuilder {
153 DeployProcessBuilder::new(self.clone())
154 }
155
156 /// Creates and starts an instance of the specified process.
157 ///
158 /// The process definition to use to create the instance can be specified
159 /// either using its unique key (as returned by [`deploy_process`]), or using the
160 /// BPMN process ID and a version. Pass -1 as the version to use the latest
161 /// deployed version.
162 ///
163 /// Note that only processes with none start events can be started through this
164 /// command.
165 ///
166 /// [`deploy_process`]: Client::deploy_process
167 ///
168 /// # Examples
169 ///
170 /// ```no_run
171 /// use serde_json::json;
172 ///
173 /// # #[tokio::main]
174 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
175 /// let client = zeebe::Client::new();
176 ///
177 /// let process_instance = client
178 /// .create_process_instance()
179 /// .with_bpmn_process_id("example-process")
180 /// .with_latest_version()
181 /// .with_variables(json!({"myData": 31243}))
182 /// .send()
183 /// .await?;
184 /// # Ok(())
185 /// # }
186 pub fn create_process_instance(&self) -> CreateProcessInstanceBuilder {
187 CreateProcessInstanceBuilder::new(self.clone())
188 }
189
190 /// Similar to [`create_process_instance`], creates and starts an instance of
191 /// the specified process_
192 ///
193 /// Unlike [`create_process_instance`], the response is returned when the
194 /// process_is completed.
195 ///
196 /// Note that only processes with none start events can be started through this
197 /// command.
198 ///
199 /// [`create_process_instance`]: Client::create_process_instance
200 ///
201 /// # Examples
202 ///
203 /// ```no_run
204 /// use serde_json::json;
205 ///
206 /// # #[tokio::main]
207 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
208 /// let client = zeebe::Client::new();
209 ///
210 /// let process_instance_with_result = client
211 /// .create_process_instance_with_result()
212 /// .with_bpmn_process_id("example-process")
213 /// .with_latest_version()
214 /// .with_variables(json!({"myData": 31243}))
215 /// .send()
216 /// .await?;
217 /// # Ok(())
218 /// # }
219 pub fn create_process_instance_with_result(&self) -> CreateProcessInstanceWithResultBuilder {
220 CreateProcessInstanceWithResultBuilder::new(self.clone())
221 }
222
223 /// Cancels a running process instance.
224 ///
225 /// # Examples
226 ///
227 /// ```no_run
228 /// # #[tokio::main]
229 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
230 /// let client = zeebe::Client::new();
231 ///
232 /// // process instance key, e.g. from a `CreateProcessInstanceResponse`.
233 /// let process_instance_key = 2251799813687287;
234 ///
235 /// let canceled = client
236 /// .cancel_process_instance()
237 /// .with_process_instance_key(process_instance_key)
238 /// .send()
239 /// .await?;
240 /// # Ok(())
241 /// # }
242 pub fn cancel_process_instance(&self) -> CancelProcessInstanceBuilder {
243 CancelProcessInstanceBuilder::new(self.clone())
244 }
245
246 /// Updates all the variables of a particular scope (e.g. process instance,
247 /// flow element instance) from the given JSON document.
248 ///
249 /// # Examples
250 ///
251 /// ```no_run
252 /// use serde_json::json;
253 ///
254 /// # #[tokio::main]
255 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
256 /// let client = zeebe::Client::new();
257 ///
258 /// // process instance key, e.g. from a `CreateProcessInstanceResponse`.
259 /// let element_instance_key = 2251799813687287;
260 ///
261 /// let set_variables = client
262 /// .set_variables()
263 /// .with_element_instance_key(element_instance_key)
264 /// .with_variables(json!({"myNewKey": "myValue"}))
265 /// .send()
266 /// .await?;
267 /// # Ok(())
268 /// # }
269 pub fn set_variables(&self) -> SetVariablesBuilder {
270 SetVariablesBuilder::new(self.clone())
271 }
272
273 /// Create a new job worker builder.
274 ///
275 /// # Examples
276 ///
277 /// ```no_run
278 /// use zeebe::{Client, Job};
279 /// # #[tokio::main]
280 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
281 /// let client = Client::new();
282 ///
283 /// client
284 /// .job_worker()
285 /// .with_job_type("my-service")
286 /// .with_handler(handle_job)
287 /// .run()
288 /// .await?;
289 ///
290 /// // job handler function
291 /// async fn handle_job(client: Client, job: Job) {
292 /// // processing work...
293 ///
294 /// let _ = client.complete_job().with_job_key(job.key()).send().await;
295 /// }
296 /// # Ok(())
297 /// # }
298 /// ```
299 pub fn job_worker(&self) -> JobWorkerBuilder {
300 JobWorkerBuilder::new(self.clone())
301 }
302
303 /// Completes a job with the given payload, which allows completing the
304 /// associated service task.
305 ///
306 /// # Examples
307 ///
308 /// ```no_run
309 /// # #[tokio::main]
310 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
311 /// let client = zeebe::Client::new();
312 ///
313 /// // typically obtained from `job.key()`;
314 /// let job_key = 2251799813687287;
315 ///
316 /// let completed_job = client
317 /// .complete_job()
318 /// .with_job_key(job_key)
319 /// .send()
320 /// .await?;
321 /// # Ok(())
322 /// # }
323 pub fn complete_job(&self) -> CompleteJobBuilder {
324 CompleteJobBuilder::new(self.clone())
325 }
326
327 /// Marks the job as failed.
328 ///
329 /// If the `retries` argument is positive, then the job will be immediately
330 /// activatable again, and a worker could try again to process it. If it is zero
331 /// or negative however, an incident will be raised, tagged with the given
332 /// `error_message`, and the job will not be activatable until the incident is
333 /// resolved.
334 ///
335 /// # Examples
336 ///
337 /// ```no_run
338 /// # #[tokio::main]
339 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
340 /// let client = zeebe::Client::new();
341 ///
342 /// // typically obtained from `job.key()`;
343 /// let job_key = 2251799813687287;
344 ///
345 /// let failed_job = client
346 /// .fail_job()
347 /// .with_job_key(job_key)
348 /// .with_error_message("something went wrong.")
349 /// .send()
350 /// .await?;
351 /// # Ok(())
352 /// # }
353 pub fn fail_job(&self) -> FailJobBuilder {
354 FailJobBuilder::new(self.clone())
355 }
356
357 /// Updates the number of retries a job has left.
358 ///
359 /// This is mostly useful for jobs that have run out of retries, should the
360 /// underlying problem be solved.
361 ///
362 /// # Examples
363 ///
364 /// ```no_run
365 /// # #[tokio::main]
366 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
367 /// let client = zeebe::Client::new();
368 ///
369 /// // typically obtained from `job.key()`;
370 /// let job_key = 2251799813687287;
371 ///
372 /// let updated = client
373 /// .update_job_retries()
374 /// .with_job_key(job_key)
375 /// .with_retries(2)
376 /// .send()
377 /// .await?;
378 /// # Ok(())
379 /// # }
380 pub fn update_job_retries(&self) -> UpdateJobRetriesBuilder {
381 UpdateJobRetriesBuilder::new(self.clone())
382 }
383
384 /// Throw an error to indicate that a business error has occurred while
385 /// processing the job.
386 ///
387 /// The error is identified by an error code and is handled by an error catch
388 /// event in the process with the same error code.
389 ///
390 /// # Examples
391 ///
392 /// ```no_run
393 /// # #[tokio::main]
394 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
395 /// let client = zeebe::Client::new();
396 ///
397 /// // typically obtained from `job.key()`;
398 /// let job_key = 2251799813687287;
399 ///
400 /// let error = client
401 /// .throw_error()
402 /// .with_job_key(job_key)
403 /// .with_error_message("something went wrong")
404 /// .with_error_code("E2505")
405 /// .send()
406 /// .await?;
407 /// # Ok(())
408 /// # }
409 pub fn throw_error(&self) -> ThrowErrorBuilder {
410 ThrowErrorBuilder::new(self.clone())
411 }
412
413 /// Publishes a single message. Messages are published to specific partitions
414 /// computed from their correlation keys.
415 ///
416 /// # Examples
417 ///
418 /// ```no_run
419 /// use serde_json::json;
420 ///
421 /// # #[tokio::main]
422 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
423 /// let client = zeebe::Client::new();
424 ///
425 /// let message = client
426 /// .publish_message()
427 /// .with_name("myEvent")
428 /// .with_variables(json!({"someKey": "someValue"}))
429 /// .send()
430 /// .await?;
431 /// # Ok(())
432 /// # }
433 pub fn publish_message(&self) -> PublishMessageBuilder {
434 PublishMessageBuilder::new(self.clone())
435 }
436
437 /// Resolves a given incident.
438 ///
439 /// This simply marks the incident as resolved; most likely a call to
440 /// [`update_job_retries`] will be necessary to actually resolve the problem,
441 /// followed by this call.
442 ///
443 /// [`update_job_retries`]: Client::update_job_retries
444 ///
445 /// # Examples
446 ///
447 /// ```no_run
448 /// # #[tokio::main]
449 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
450 /// let client = zeebe::Client::new();
451 ///
452 /// let incident_key = 2251799813687287;
453 ///
454 /// let resolved = client
455 /// .resolve_incident()
456 /// .with_incident_key(incident_key)
457 /// .send()
458 /// .await?;
459 /// # Ok(())
460 /// # }
461 pub fn resolve_incident(&self) -> ResolveIncidentBuilder {
462 ResolveIncidentBuilder::new(self.clone())
463 }
464
465 fn build_channel(endpoints: Vec<String>, tls: Option<ClientTlsConfig>) -> Result<Channel> {
466 let endpoints = endpoints
467 .into_iter()
468 .map(|uri| {
469 Channel::from_shared(uri.clone())
470 .map_err(|err| Error::InvalidGatewayUri {
471 uri,
472 message: err.to_string(),
473 })
474 .map(|channel| {
475 channel
476 .timeout(DEFAULT_REQUEST_TIMEOUT)
477 .keep_alive_timeout(DEFAULT_KEEP_ALIVE)
478 })
479 })
480 .map(|c| {
481 c.and_then(|c| match &tls {
482 Some(tls) => c.tls_config(tls.to_owned()).map_err(From::from),
483 None => Ok(c),
484 })
485 })
486 .collect::<Result<Vec<_>>>()?;
487
488 Ok(Channel::balance_list(endpoints.into_iter()))
489 }
490}
491
492/// Config for establishing zeebe client.
493///
494/// See [the ClientTlsConfig docs] for tls configuration.
495///
496/// [the ClientTlsConfig docs]: tonic::transport::ClientTlsConfig
497///
498/// # Examples
499///
500/// ```
501/// let endpoints = vec!["http://127.0.0.1:26500".to_string()];
502///
503/// let config = zeebe::ClientConfig {
504/// endpoints,
505/// tls: None,
506/// auth: None,
507/// };
508/// ```
509#[derive(Debug)]
510pub struct ClientConfig {
511 /// The endpoints the client should connect to
512 pub endpoints: Vec<String>,
513 /// TLS configuration
514 pub tls: Option<ClientTlsConfig>,
515 /// OAuth config
516 pub auth: Option<OAuthConfig>,
517}
518
519impl ClientConfig {
520 /// Get client config from environment
521 pub fn from_env() -> Result<Self> {
522 let tls = if let Ok(ca_path) = env::var(CA_CERTIFICATE_PATH) {
523 let pem = fs::read_to_string(ca_path).map_err(|err| Error::Auth(err.to_string()))?;
524 let cert = Certificate::from_pem(pem);
525
526 Some(ClientTlsConfig::new().ca_certificate(cert))
527 } else {
528 None
529 };
530
531 let address = if let Ok(gateway_host) = env::var(HOST) {
532 if let Ok(gateway_port) = env::var(PORT) {
533 format!("{}:{}", gateway_host, gateway_port)
534 } else {
535 format!("{}:{}", gateway_host, DEFAULT_ADDRESS_PORT)
536 }
537 } else if let Ok(gateway_port) = env::var(PORT) {
538 format!("{}:{}", DEFAULT_ADDRESS_HOST, gateway_port)
539 } else if let Ok(gateway_address) = env::var(ADDRESS) {
540 gateway_address
541 } else {
542 format!("{}:{}", DEFAULT_ADDRESS_HOST, DEFAULT_ADDRESS_PORT)
543 };
544
545 let auth = if OAuthConfig::should_use_env_config() {
546 Some(OAuthConfig::from_env()?)
547 } else {
548 None
549 };
550
551 Ok(ClientConfig {
552 endpoints: vec![address],
553 tls,
554 auth,
555 })
556 }
557
558 /// Set the grpc endpoints the client should connect to.
559 pub fn with_endpoints(endpoints: Vec<String>) -> Self {
560 ClientConfig {
561 endpoints,
562 tls: None,
563 auth: None,
564 }
565 }
566}
567
568impl Default for ClientConfig {
569 fn default() -> Self {
570 let default_address = format!("{}:{}", DEFAULT_ADDRESS_HOST, DEFAULT_ADDRESS_PORT);
571 ClientConfig::with_endpoints(vec![default_address])
572 }
573}