zeebe 0.4.2

A rust client for defining, orchestrating, and monitoring business processes across microservices using Zeebe.
Documentation
use crate::oauth::AuthInterceptor;
use crate::{
    error::{Error, Result},
    job::{CompleteJobBuilder, FailJobBuilder, ThrowErrorBuilder, UpdateJobRetriesBuilder},
    oauth::OAuthConfig,
    process::{
        CancelProcessInstanceBuilder, CreateProcessInstanceBuilder,
        CreateProcessInstanceWithResultBuilder, DeployProcessBuilder, SetVariablesBuilder,
    },
    proto::gateway_client::GatewayClient,
    topology::TopologyBuilder,
    util::{PublishMessageBuilder, ResolveIncidentBuilder},
    worker::{auto_handler::Extensions, JobWorkerBuilder},
};
use std::env;
use std::fmt::Debug;
use std::fs;
use std::rc::Rc;
use std::time::Duration;
use tonic::codegen::InterceptedService;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(45);
const CA_CERTIFICATE_PATH: &str = "ZEEBE_CA_CERTIFICATE_PATH";
const ADDRESS: &str = "ZEEBE_ADDRESS";
const HOST: &str = "ZEEBE_HOST";
const DEFAULT_ADDRESS_HOST: &str = "http://127.0.0.1";
const PORT: &str = "ZEEBE_PORT";
const DEFAULT_ADDRESS_PORT: &str = "26500";

/// Client used to communicate with Zeebe.
#[derive(Clone, Debug)]
pub struct Client {
    pub(crate) gateway_client: GatewayClient<InterceptedService<Channel, AuthInterceptor>>,
    pub(crate) auth_interceptor: AuthInterceptor,
    pub(crate) current_job_key: Option<i64>,
    pub(crate) current_job_extensions: Option<Rc<Extensions>>,
}

impl Default for Client {
    fn default() -> Self {
        Client::from_config(ClientConfig::default()).unwrap()
    }
}

impl Client {
    /// Create a new client with default config.
    pub fn new() -> Self {
        Client::default()
    }

    /// Create a new client from environment variables
    pub fn from_env() -> Result<Self> {
        Client::from_config(ClientConfig::from_env()?)
    }

    /// Build a new Zeebe client from a given configuration.
    ///
    /// # Examples
    ///
    /// ```
    /// use zeebe::{Client, ClientConfig};
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let endpoints = vec!["http://0.0.0.0:26500".to_string()];
    ///
    /// let client = Client::from_config(ClientConfig::with_endpoints(endpoints));
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// with TLS (see [the ClientTlsConfig docs] for configuration):
    ///
    /// [the ClientTlsConfig docs]: tonic::transport::ClientTlsConfig
    ///
    /// ```
    /// use zeebe::{Client, ClientConfig};
    /// use tonic::transport::ClientTlsConfig;
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let endpoints = vec!["http://0.0.0.0:26500".to_string()];
    /// let tls = ClientTlsConfig::new();
    ///
    /// let client = Client::from_config(ClientConfig {
    ///     endpoints,
    ///     tls: Some(tls),
    ///     auth: None,
    /// })?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn from_config(config: ClientConfig) -> Result<Self> {
        let channel = Self::build_channel(config.endpoints, config.tls)?;

        let auth_interceptor = if let Some(auth_config) = config.auth {
            AuthInterceptor::init(auth_config)?
        } else {
            AuthInterceptor::default()
        };

        let gateway_client = GatewayClient::with_interceptor(channel, auth_interceptor.clone());

        Ok(Client {
            gateway_client,
            auth_interceptor,
            current_job_key: None,
            current_job_extensions: None,
        })
    }

    /// Future that resolves when the auth interceptor is initialized.
    pub async fn auth_initialized(&self) -> Result<()> {
        self.auth_interceptor.auth_initialized().await
    }

    /// Obtains the current topology of the cluster the gateway is part of.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// let topology = client.topology().send().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn topology(&self) -> TopologyBuilder {
        TopologyBuilder::new(self.clone())
    }

    /// Deploys one or more processes to Zeebe. Note that this is an atomic call,
    /// i.e. either all processes are deployed, or none of them are.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// let process = client
    ///     .deploy_process()
    ///     .with_resource_file("path/to/process.bpmn")
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn deploy_process(&self) -> DeployProcessBuilder {
        DeployProcessBuilder::new(self.clone())
    }

    /// Creates and starts an instance of the specified process.
    ///
    /// The process definition to use to create the instance can be specified
    /// either using its unique key (as returned by [`deploy_process`]), or using the
    /// BPMN process ID and a version. Pass -1 as the version to use the latest
    /// deployed version.
    ///
    /// Note that only processes with none start events can be started through this
    /// command.
    ///
    /// [`deploy_process`]: Client::deploy_process
    ///
    ///  # Examples
    ///
    /// ```no_run
    /// use serde_json::json;
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// let process_instance = client
    ///     .create_process_instance()
    ///     .with_bpmn_process_id("example-process")
    ///     .with_latest_version()
    ///     .with_variables(json!({"myData": 31243}))
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn create_process_instance(&self) -> CreateProcessInstanceBuilder {
        CreateProcessInstanceBuilder::new(self.clone())
    }

    /// Similar to [`create_process_instance`], creates and starts an instance of
    /// the specified process_
    ///
    /// Unlike [`create_process_instance`], the response is returned when the
    /// process_is completed.
    ///
    /// Note that only processes with none start events can be started through this
    /// command.
    ///
    /// [`create_process_instance`]: Client::create_process_instance
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use serde_json::json;
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// let process_instance_with_result = client
    ///     .create_process_instance_with_result()
    ///     .with_bpmn_process_id("example-process")
    ///     .with_latest_version()
    ///     .with_variables(json!({"myData": 31243}))
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn create_process_instance_with_result(&self) -> CreateProcessInstanceWithResultBuilder {
        CreateProcessInstanceWithResultBuilder::new(self.clone())
    }

    /// Cancels a running process instance.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// // process instance key, e.g. from a `CreateProcessInstanceResponse`.
    /// let process_instance_key = 2251799813687287;
    ///
    /// let canceled = client
    ///     .cancel_process_instance()
    ///     .with_process_instance_key(process_instance_key)
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn cancel_process_instance(&self) -> CancelProcessInstanceBuilder {
        CancelProcessInstanceBuilder::new(self.clone())
    }

    /// Updates all the variables of a particular scope (e.g. process instance,
    /// flow element instance) from the given JSON document.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use serde_json::json;
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// // process instance key, e.g. from a `CreateProcessInstanceResponse`.
    /// let element_instance_key = 2251799813687287;
    ///
    /// let set_variables = client
    ///     .set_variables()
    ///     .with_element_instance_key(element_instance_key)
    ///     .with_variables(json!({"myNewKey": "myValue"}))
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn set_variables(&self) -> SetVariablesBuilder {
        SetVariablesBuilder::new(self.clone())
    }

    /// Create a new job worker builder.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use zeebe::{Client, Job};
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = Client::new();
    ///
    /// client
    ///     .job_worker()
    ///     .with_job_type("my-service")
    ///     .with_handler(handle_job)
    ///     .run()
    ///     .await?;
    ///
    /// // job handler function
    /// async fn handle_job(client: Client, job: Job) {
    ///     // processing work...
    ///
    ///     let _ = client.complete_job().with_job_key(job.key()).send().await;
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn job_worker(&self) -> JobWorkerBuilder {
        JobWorkerBuilder::new(self.clone())
    }

    /// Completes a job with the given payload, which allows completing the
    /// associated service task.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// // typically obtained from `job.key()`;
    /// let job_key = 2251799813687287;
    ///
    /// let completed_job = client
    ///     .complete_job()
    ///     .with_job_key(job_key)
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn complete_job(&self) -> CompleteJobBuilder {
        CompleteJobBuilder::new(self.clone())
    }

    /// Marks the job as failed.
    ///
    /// If the `retries` argument is positive, then the job will be immediately
    /// activatable again, and a worker could try again to process it. If it is zero
    /// or negative however, an incident will be raised, tagged with the given
    /// `error_message`, and the job will not be activatable until the incident is
    /// resolved.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// // typically obtained from `job.key()`;
    /// let job_key = 2251799813687287;
    ///
    /// let failed_job = client
    ///     .fail_job()
    ///     .with_job_key(job_key)
    ///     .with_error_message("something went wrong.")
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn fail_job(&self) -> FailJobBuilder {
        FailJobBuilder::new(self.clone())
    }

    /// Updates the number of retries a job has left.
    ///
    /// This is mostly useful for jobs that have run out of retries, should the
    /// underlying problem be solved.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// // typically obtained from `job.key()`;
    /// let job_key = 2251799813687287;
    ///
    /// let updated = client
    ///     .update_job_retries()
    ///     .with_job_key(job_key)
    ///     .with_retries(2)
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn update_job_retries(&self) -> UpdateJobRetriesBuilder {
        UpdateJobRetriesBuilder::new(self.clone())
    }

    /// Throw an error to indicate that a business error has occurred while
    /// processing the job.
    ///
    /// The error is identified by an error code and is handled by an error catch
    /// event in the process with the same error code.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// // typically obtained from `job.key()`;
    /// let job_key = 2251799813687287;
    ///
    /// let error = client
    ///     .throw_error()
    ///     .with_job_key(job_key)
    ///     .with_error_message("something went wrong")
    ///     .with_error_code("E2505")
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn throw_error(&self) -> ThrowErrorBuilder {
        ThrowErrorBuilder::new(self.clone())
    }

    /// Publishes a single message. Messages are published to specific partitions
    /// computed from their correlation keys.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use serde_json::json;
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// let message = client
    ///     .publish_message()
    ///     .with_name("myEvent")
    ///     .with_variables(json!({"someKey": "someValue"}))
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn publish_message(&self) -> PublishMessageBuilder {
        PublishMessageBuilder::new(self.clone())
    }

    /// Resolves a given incident.
    ///
    /// This simply marks the incident as resolved; most likely a call to
    /// [`update_job_retries`] will be necessary to actually resolve the problem,
    /// followed by this call.
    ///
    /// [`update_job_retries`]: Client::update_job_retries
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = zeebe::Client::new();
    ///
    /// let incident_key = 2251799813687287;
    ///
    /// let resolved = client
    ///     .resolve_incident()
    ///     .with_incident_key(incident_key)
    ///     .send()
    ///     .await?;
    /// # Ok(())
    /// # }
    pub fn resolve_incident(&self) -> ResolveIncidentBuilder {
        ResolveIncidentBuilder::new(self.clone())
    }

    fn build_channel(endpoints: Vec<String>, tls: Option<ClientTlsConfig>) -> Result<Channel> {
        let endpoints = endpoints
            .into_iter()
            .map(|uri| {
                Channel::from_shared(uri.clone())
                    .map_err(|err| Error::InvalidGatewayUri {
                        uri,
                        message: err.to_string(),
                    })
                    .map(|channel| {
                        channel
                            .timeout(DEFAULT_REQUEST_TIMEOUT)
                            .keep_alive_timeout(DEFAULT_KEEP_ALIVE)
                    })
            })
            .map(|c| {
                c.and_then(|c| match &tls {
                    Some(tls) => c.tls_config(tls.to_owned()).map_err(From::from),
                    None => Ok(c),
                })
            })
            .collect::<Result<Vec<_>>>()?;

        Ok(Channel::balance_list(endpoints.into_iter()))
    }
}

/// Config for establishing zeebe client.
///
/// See [the ClientTlsConfig docs] for tls configuration.
///
/// [the ClientTlsConfig docs]: tonic::transport::ClientTlsConfig
///
/// # Examples
///
/// ```
/// let endpoints = vec!["http://127.0.0.1:26500".to_string()];
///
/// let config = zeebe::ClientConfig {
///     endpoints,
///     tls: None,
///     auth: None,
/// };
/// ```
#[derive(Debug)]
pub struct ClientConfig {
    /// The endpoints the client should connect to
    pub endpoints: Vec<String>,
    /// TLS configuration
    pub tls: Option<ClientTlsConfig>,
    /// OAuth config
    pub auth: Option<OAuthConfig>,
}

impl ClientConfig {
    /// Get client config from environment
    pub fn from_env() -> Result<Self> {
        let tls = if let Ok(ca_path) = env::var(CA_CERTIFICATE_PATH) {
            let pem = fs::read_to_string(ca_path).map_err(|err| Error::Auth(err.to_string()))?;
            let cert = Certificate::from_pem(pem);

            Some(ClientTlsConfig::new().ca_certificate(cert))
        } else {
            None
        };

        let address = if let Ok(gateway_host) = env::var(HOST) {
            if let Ok(gateway_port) = env::var(PORT) {
                format!("{}:{}", gateway_host, gateway_port)
            } else {
                format!("{}:{}", gateway_host, DEFAULT_ADDRESS_PORT)
            }
        } else if let Ok(gateway_port) = env::var(PORT) {
            format!("{}:{}", DEFAULT_ADDRESS_HOST, gateway_port)
        } else if let Ok(gateway_address) = env::var(ADDRESS) {
            gateway_address
        } else {
            format!("{}:{}", DEFAULT_ADDRESS_HOST, DEFAULT_ADDRESS_PORT)
        };

        let auth = if OAuthConfig::should_use_env_config() {
            Some(OAuthConfig::from_env()?)
        } else {
            None
        };

        Ok(ClientConfig {
            endpoints: vec![address],
            tls,
            auth,
        })
    }

    /// Set the grpc endpoints the client should connect to.
    pub fn with_endpoints(endpoints: Vec<String>) -> Self {
        ClientConfig {
            endpoints,
            tls: None,
            auth: None,
        }
    }
}

impl Default for ClientConfig {
    fn default() -> Self {
        let default_address = format!("{}:{}", DEFAULT_ADDRESS_HOST, DEFAULT_ADDRESS_PORT);
        ClientConfig::with_endpoints(vec![default_address])
    }
}