zeebe/
client.rs

1use crate::oauth::AuthInterceptor;
2use crate::{
3    error::{Error, Result},
4    job::{CompleteJobBuilder, FailJobBuilder, ThrowErrorBuilder, UpdateJobRetriesBuilder},
5    oauth::OAuthConfig,
6    process::{
7        CancelProcessInstanceBuilder, CreateProcessInstanceBuilder,
8        CreateProcessInstanceWithResultBuilder, DeployProcessBuilder, SetVariablesBuilder,
9    },
10    proto::gateway_client::GatewayClient,
11    topology::TopologyBuilder,
12    util::{PublishMessageBuilder, ResolveIncidentBuilder},
13    worker::{auto_handler::Extensions, JobWorkerBuilder},
14};
15use std::env;
16use std::fmt::Debug;
17use std::fs;
18use std::rc::Rc;
19use std::time::Duration;
20use tonic::codegen::InterceptedService;
21use tonic::transport::{Certificate, Channel, ClientTlsConfig};
22
23const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
24const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(45);
25const CA_CERTIFICATE_PATH: &str = "ZEEBE_CA_CERTIFICATE_PATH";
26const ADDRESS: &str = "ZEEBE_ADDRESS";
27const HOST: &str = "ZEEBE_HOST";
28const DEFAULT_ADDRESS_HOST: &str = "http://127.0.0.1";
29const PORT: &str = "ZEEBE_PORT";
30const DEFAULT_ADDRESS_PORT: &str = "26500";
31
32/// Client used to communicate with Zeebe.
33#[derive(Clone, Debug)]
34pub struct Client {
35    pub(crate) gateway_client: GatewayClient<InterceptedService<Channel, AuthInterceptor>>,
36    pub(crate) auth_interceptor: AuthInterceptor,
37    pub(crate) current_job_key: Option<i64>,
38    pub(crate) current_job_extensions: Option<Rc<Extensions>>,
39}
40
41impl Default for Client {
42    fn default() -> Self {
43        Client::from_config(ClientConfig::default()).unwrap()
44    }
45}
46
47impl Client {
48    /// Create a new client with default config.
49    pub fn new() -> Self {
50        Client::default()
51    }
52
53    /// Create a new client from environment variables
54    pub fn from_env() -> Result<Self> {
55        Client::from_config(ClientConfig::from_env()?)
56    }
57
58    /// Build a new Zeebe client from a given configuration.
59    ///
60    /// # Examples
61    ///
62    /// ```
63    /// use zeebe::{Client, ClientConfig};
64    /// # #[tokio::main]
65    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
66    /// let endpoints = vec!["http://0.0.0.0:26500".to_string()];
67    ///
68    /// let client = Client::from_config(ClientConfig::with_endpoints(endpoints));
69    /// # Ok(())
70    /// # }
71    /// ```
72    ///
73    /// with TLS (see [the ClientTlsConfig docs] for configuration):
74    ///
75    /// [the ClientTlsConfig docs]: tonic::transport::ClientTlsConfig
76    ///
77    /// ```
78    /// use zeebe::{Client, ClientConfig};
79    /// use tonic::transport::ClientTlsConfig;
80    ///
81    /// # #[tokio::main]
82    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
83    /// let endpoints = vec!["http://0.0.0.0:26500".to_string()];
84    /// let tls = ClientTlsConfig::new();
85    ///
86    /// let client = Client::from_config(ClientConfig {
87    ///     endpoints,
88    ///     tls: Some(tls),
89    ///     auth: None,
90    /// })?;
91    /// # Ok(())
92    /// # }
93    /// ```
94    pub fn from_config(config: ClientConfig) -> Result<Self> {
95        let channel = Self::build_channel(config.endpoints, config.tls)?;
96
97        let auth_interceptor = if let Some(auth_config) = config.auth {
98            AuthInterceptor::init(auth_config)?
99        } else {
100            AuthInterceptor::default()
101        };
102
103        let gateway_client = GatewayClient::with_interceptor(channel, auth_interceptor.clone());
104
105        Ok(Client {
106            gateway_client,
107            auth_interceptor,
108            current_job_key: None,
109            current_job_extensions: None,
110        })
111    }
112
113    /// Future that resolves when the auth interceptor is initialized.
114    pub async fn auth_initialized(&self) -> Result<()> {
115        self.auth_interceptor.auth_initialized().await
116    }
117
118    /// Obtains the current topology of the cluster the gateway is part of.
119    ///
120    /// # Examples
121    ///
122    /// ```no_run
123    /// # #[tokio::main]
124    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
125    /// let client = zeebe::Client::new();
126    ///
127    /// let topology = client.topology().send().await?;
128    /// # Ok(())
129    /// # }
130    /// ```
131    pub fn topology(&self) -> TopologyBuilder {
132        TopologyBuilder::new(self.clone())
133    }
134
135    /// Deploys one or more processes to Zeebe. Note that this is an atomic call,
136    /// i.e. either all processes are deployed, or none of them are.
137    ///
138    /// # Examples
139    ///
140    /// ```no_run
141    /// # #[tokio::main]
142    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
143    /// let client = zeebe::Client::new();
144    ///
145    /// let process = client
146    ///     .deploy_process()
147    ///     .with_resource_file("path/to/process.bpmn")
148    ///     .send()
149    ///     .await?;
150    /// # Ok(())
151    /// # }
152    pub fn deploy_process(&self) -> DeployProcessBuilder {
153        DeployProcessBuilder::new(self.clone())
154    }
155
156    /// Creates and starts an instance of the specified process.
157    ///
158    /// The process definition to use to create the instance can be specified
159    /// either using its unique key (as returned by [`deploy_process`]), or using the
160    /// BPMN process ID and a version. Pass -1 as the version to use the latest
161    /// deployed version.
162    ///
163    /// Note that only processes with none start events can be started through this
164    /// command.
165    ///
166    /// [`deploy_process`]: Client::deploy_process
167    ///
168    ///  # Examples
169    ///
170    /// ```no_run
171    /// use serde_json::json;
172    ///
173    /// # #[tokio::main]
174    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
175    /// let client = zeebe::Client::new();
176    ///
177    /// let process_instance = client
178    ///     .create_process_instance()
179    ///     .with_bpmn_process_id("example-process")
180    ///     .with_latest_version()
181    ///     .with_variables(json!({"myData": 31243}))
182    ///     .send()
183    ///     .await?;
184    /// # Ok(())
185    /// # }
186    pub fn create_process_instance(&self) -> CreateProcessInstanceBuilder {
187        CreateProcessInstanceBuilder::new(self.clone())
188    }
189
190    /// Similar to [`create_process_instance`], creates and starts an instance of
191    /// the specified process_
192    ///
193    /// Unlike [`create_process_instance`], the response is returned when the
194    /// process_is completed.
195    ///
196    /// Note that only processes with none start events can be started through this
197    /// command.
198    ///
199    /// [`create_process_instance`]: Client::create_process_instance
200    ///
201    /// # Examples
202    ///
203    /// ```no_run
204    /// use serde_json::json;
205    ///
206    /// # #[tokio::main]
207    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
208    /// let client = zeebe::Client::new();
209    ///
210    /// let process_instance_with_result = client
211    ///     .create_process_instance_with_result()
212    ///     .with_bpmn_process_id("example-process")
213    ///     .with_latest_version()
214    ///     .with_variables(json!({"myData": 31243}))
215    ///     .send()
216    ///     .await?;
217    /// # Ok(())
218    /// # }
219    pub fn create_process_instance_with_result(&self) -> CreateProcessInstanceWithResultBuilder {
220        CreateProcessInstanceWithResultBuilder::new(self.clone())
221    }
222
223    /// Cancels a running process instance.
224    ///
225    /// # Examples
226    ///
227    /// ```no_run
228    /// # #[tokio::main]
229    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
230    /// let client = zeebe::Client::new();
231    ///
232    /// // process instance key, e.g. from a `CreateProcessInstanceResponse`.
233    /// let process_instance_key = 2251799813687287;
234    ///
235    /// let canceled = client
236    ///     .cancel_process_instance()
237    ///     .with_process_instance_key(process_instance_key)
238    ///     .send()
239    ///     .await?;
240    /// # Ok(())
241    /// # }
242    pub fn cancel_process_instance(&self) -> CancelProcessInstanceBuilder {
243        CancelProcessInstanceBuilder::new(self.clone())
244    }
245
246    /// Updates all the variables of a particular scope (e.g. process instance,
247    /// flow element instance) from the given JSON document.
248    ///
249    /// # Examples
250    ///
251    /// ```no_run
252    /// use serde_json::json;
253    ///
254    /// # #[tokio::main]
255    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
256    /// let client = zeebe::Client::new();
257    ///
258    /// // process instance key, e.g. from a `CreateProcessInstanceResponse`.
259    /// let element_instance_key = 2251799813687287;
260    ///
261    /// let set_variables = client
262    ///     .set_variables()
263    ///     .with_element_instance_key(element_instance_key)
264    ///     .with_variables(json!({"myNewKey": "myValue"}))
265    ///     .send()
266    ///     .await?;
267    /// # Ok(())
268    /// # }
269    pub fn set_variables(&self) -> SetVariablesBuilder {
270        SetVariablesBuilder::new(self.clone())
271    }
272
273    /// Create a new job worker builder.
274    ///
275    /// # Examples
276    ///
277    /// ```no_run
278    /// use zeebe::{Client, Job};
279    /// # #[tokio::main]
280    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
281    /// let client = Client::new();
282    ///
283    /// client
284    ///     .job_worker()
285    ///     .with_job_type("my-service")
286    ///     .with_handler(handle_job)
287    ///     .run()
288    ///     .await?;
289    ///
290    /// // job handler function
291    /// async fn handle_job(client: Client, job: Job) {
292    ///     // processing work...
293    ///
294    ///     let _ = client.complete_job().with_job_key(job.key()).send().await;
295    /// }
296    /// # Ok(())
297    /// # }
298    /// ```
299    pub fn job_worker(&self) -> JobWorkerBuilder {
300        JobWorkerBuilder::new(self.clone())
301    }
302
303    /// Completes a job with the given payload, which allows completing the
304    /// associated service task.
305    ///
306    /// # Examples
307    ///
308    /// ```no_run
309    /// # #[tokio::main]
310    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
311    /// let client = zeebe::Client::new();
312    ///
313    /// // typically obtained from `job.key()`;
314    /// let job_key = 2251799813687287;
315    ///
316    /// let completed_job = client
317    ///     .complete_job()
318    ///     .with_job_key(job_key)
319    ///     .send()
320    ///     .await?;
321    /// # Ok(())
322    /// # }
323    pub fn complete_job(&self) -> CompleteJobBuilder {
324        CompleteJobBuilder::new(self.clone())
325    }
326
327    /// Marks the job as failed.
328    ///
329    /// If the `retries` argument is positive, then the job will be immediately
330    /// activatable again, and a worker could try again to process it. If it is zero
331    /// or negative however, an incident will be raised, tagged with the given
332    /// `error_message`, and the job will not be activatable until the incident is
333    /// resolved.
334    ///
335    /// # Examples
336    ///
337    /// ```no_run
338    /// # #[tokio::main]
339    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
340    /// let client = zeebe::Client::new();
341    ///
342    /// // typically obtained from `job.key()`;
343    /// let job_key = 2251799813687287;
344    ///
345    /// let failed_job = client
346    ///     .fail_job()
347    ///     .with_job_key(job_key)
348    ///     .with_error_message("something went wrong.")
349    ///     .send()
350    ///     .await?;
351    /// # Ok(())
352    /// # }
353    pub fn fail_job(&self) -> FailJobBuilder {
354        FailJobBuilder::new(self.clone())
355    }
356
357    /// Updates the number of retries a job has left.
358    ///
359    /// This is mostly useful for jobs that have run out of retries, should the
360    /// underlying problem be solved.
361    ///
362    /// # Examples
363    ///
364    /// ```no_run
365    /// # #[tokio::main]
366    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
367    /// let client = zeebe::Client::new();
368    ///
369    /// // typically obtained from `job.key()`;
370    /// let job_key = 2251799813687287;
371    ///
372    /// let updated = client
373    ///     .update_job_retries()
374    ///     .with_job_key(job_key)
375    ///     .with_retries(2)
376    ///     .send()
377    ///     .await?;
378    /// # Ok(())
379    /// # }
380    pub fn update_job_retries(&self) -> UpdateJobRetriesBuilder {
381        UpdateJobRetriesBuilder::new(self.clone())
382    }
383
384    /// Throw an error to indicate that a business error has occurred while
385    /// processing the job.
386    ///
387    /// The error is identified by an error code and is handled by an error catch
388    /// event in the process with the same error code.
389    ///
390    /// # Examples
391    ///
392    /// ```no_run
393    /// # #[tokio::main]
394    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
395    /// let client = zeebe::Client::new();
396    ///
397    /// // typically obtained from `job.key()`;
398    /// let job_key = 2251799813687287;
399    ///
400    /// let error = client
401    ///     .throw_error()
402    ///     .with_job_key(job_key)
403    ///     .with_error_message("something went wrong")
404    ///     .with_error_code("E2505")
405    ///     .send()
406    ///     .await?;
407    /// # Ok(())
408    /// # }
409    pub fn throw_error(&self) -> ThrowErrorBuilder {
410        ThrowErrorBuilder::new(self.clone())
411    }
412
413    /// Publishes a single message. Messages are published to specific partitions
414    /// computed from their correlation keys.
415    ///
416    /// # Examples
417    ///
418    /// ```no_run
419    /// use serde_json::json;
420    ///
421    /// # #[tokio::main]
422    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
423    /// let client = zeebe::Client::new();
424    ///
425    /// let message = client
426    ///     .publish_message()
427    ///     .with_name("myEvent")
428    ///     .with_variables(json!({"someKey": "someValue"}))
429    ///     .send()
430    ///     .await?;
431    /// # Ok(())
432    /// # }
433    pub fn publish_message(&self) -> PublishMessageBuilder {
434        PublishMessageBuilder::new(self.clone())
435    }
436
437    /// Resolves a given incident.
438    ///
439    /// This simply marks the incident as resolved; most likely a call to
440    /// [`update_job_retries`] will be necessary to actually resolve the problem,
441    /// followed by this call.
442    ///
443    /// [`update_job_retries`]: Client::update_job_retries
444    ///
445    /// # Examples
446    ///
447    /// ```no_run
448    /// # #[tokio::main]
449    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
450    /// let client = zeebe::Client::new();
451    ///
452    /// let incident_key = 2251799813687287;
453    ///
454    /// let resolved = client
455    ///     .resolve_incident()
456    ///     .with_incident_key(incident_key)
457    ///     .send()
458    ///     .await?;
459    /// # Ok(())
460    /// # }
461    pub fn resolve_incident(&self) -> ResolveIncidentBuilder {
462        ResolveIncidentBuilder::new(self.clone())
463    }
464
465    fn build_channel(endpoints: Vec<String>, tls: Option<ClientTlsConfig>) -> Result<Channel> {
466        let endpoints = endpoints
467            .into_iter()
468            .map(|uri| {
469                Channel::from_shared(uri.clone())
470                    .map_err(|err| Error::InvalidGatewayUri {
471                        uri,
472                        message: err.to_string(),
473                    })
474                    .map(|channel| {
475                        channel
476                            .timeout(DEFAULT_REQUEST_TIMEOUT)
477                            .keep_alive_timeout(DEFAULT_KEEP_ALIVE)
478                    })
479            })
480            .map(|c| {
481                c.and_then(|c| match &tls {
482                    Some(tls) => c.tls_config(tls.to_owned()).map_err(From::from),
483                    None => Ok(c),
484                })
485            })
486            .collect::<Result<Vec<_>>>()?;
487
488        Ok(Channel::balance_list(endpoints.into_iter()))
489    }
490}
491
492/// Config for establishing zeebe client.
493///
494/// See [the ClientTlsConfig docs] for tls configuration.
495///
496/// [the ClientTlsConfig docs]: tonic::transport::ClientTlsConfig
497///
498/// # Examples
499///
500/// ```
501/// let endpoints = vec!["http://127.0.0.1:26500".to_string()];
502///
503/// let config = zeebe::ClientConfig {
504///     endpoints,
505///     tls: None,
506///     auth: None,
507/// };
508/// ```
509#[derive(Debug)]
510pub struct ClientConfig {
511    /// The endpoints the client should connect to
512    pub endpoints: Vec<String>,
513    /// TLS configuration
514    pub tls: Option<ClientTlsConfig>,
515    /// OAuth config
516    pub auth: Option<OAuthConfig>,
517}
518
519impl ClientConfig {
520    /// Get client config from environment
521    pub fn from_env() -> Result<Self> {
522        let tls = if let Ok(ca_path) = env::var(CA_CERTIFICATE_PATH) {
523            let pem = fs::read_to_string(ca_path).map_err(|err| Error::Auth(err.to_string()))?;
524            let cert = Certificate::from_pem(pem);
525
526            Some(ClientTlsConfig::new().ca_certificate(cert))
527        } else {
528            None
529        };
530
531        let address = if let Ok(gateway_host) = env::var(HOST) {
532            if let Ok(gateway_port) = env::var(PORT) {
533                format!("{}:{}", gateway_host, gateway_port)
534            } else {
535                format!("{}:{}", gateway_host, DEFAULT_ADDRESS_PORT)
536            }
537        } else if let Ok(gateway_port) = env::var(PORT) {
538            format!("{}:{}", DEFAULT_ADDRESS_HOST, gateway_port)
539        } else if let Ok(gateway_address) = env::var(ADDRESS) {
540            gateway_address
541        } else {
542            format!("{}:{}", DEFAULT_ADDRESS_HOST, DEFAULT_ADDRESS_PORT)
543        };
544
545        let auth = if OAuthConfig::should_use_env_config() {
546            Some(OAuthConfig::from_env()?)
547        } else {
548            None
549        };
550
551        Ok(ClientConfig {
552            endpoints: vec![address],
553            tls,
554            auth,
555        })
556    }
557
558    /// Set the grpc endpoints the client should connect to.
559    pub fn with_endpoints(endpoints: Vec<String>) -> Self {
560        ClientConfig {
561            endpoints,
562            tls: None,
563            auth: None,
564        }
565    }
566}
567
568impl Default for ClientConfig {
569    fn default() -> Self {
570        let default_address = format!("{}:{}", DEFAULT_ADDRESS_HOST, DEFAULT_ADDRESS_PORT);
571        ClientConfig::with_endpoints(vec![default_address])
572    }
573}