use crate::oauth::AuthInterceptor;
use crate::{
error::{Error, Result},
job::{CompleteJobBuilder, FailJobBuilder, ThrowErrorBuilder, UpdateJobRetriesBuilder},
oauth::OAuthConfig,
process::{
CancelProcessInstanceBuilder, CreateProcessInstanceBuilder,
CreateProcessInstanceWithResultBuilder, DeployProcessBuilder, SetVariablesBuilder,
},
proto::gateway_client::GatewayClient,
topology::TopologyBuilder,
util::{PublishMessageBuilder, ResolveIncidentBuilder},
worker::{auto_handler::Extensions, JobWorkerBuilder},
};
use std::env;
use std::fmt::Debug;
use std::fs;
use std::rc::Rc;
use std::time::Duration;
use tonic::codegen::InterceptedService;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(45);
const CA_CERTIFICATE_PATH: &str = "ZEEBE_CA_CERTIFICATE_PATH";
const ADDRESS: &str = "ZEEBE_ADDRESS";
const HOST: &str = "ZEEBE_HOST";
const DEFAULT_ADDRESS_HOST: &str = "http://127.0.0.1";
const PORT: &str = "ZEEBE_PORT";
const DEFAULT_ADDRESS_PORT: &str = "26500";
#[derive(Clone, Debug)]
pub struct Client {
pub(crate) gateway_client: GatewayClient<InterceptedService<Channel, AuthInterceptor>>,
pub(crate) auth_interceptor: AuthInterceptor,
pub(crate) current_job_key: Option<i64>,
pub(crate) current_job_extensions: Option<Rc<Extensions>>,
}
impl Default for Client {
fn default() -> Self {
Client::from_config(ClientConfig::default()).unwrap()
}
}
impl Client {
pub fn new() -> Self {
Client::default()
}
pub fn from_env() -> Result<Self> {
Client::from_config(ClientConfig::from_env()?)
}
pub fn from_config(config: ClientConfig) -> Result<Self> {
let channel = Self::build_channel(config.endpoints, config.tls)?;
let auth_interceptor = if let Some(auth_config) = config.auth {
AuthInterceptor::init(auth_config)?
} else {
AuthInterceptor::default()
};
let gateway_client = GatewayClient::with_interceptor(channel, auth_interceptor.clone());
Ok(Client {
gateway_client,
auth_interceptor,
current_job_key: None,
current_job_extensions: None,
})
}
pub async fn auth_initialized(&self) -> Result<()> {
self.auth_interceptor.auth_initialized().await
}
pub fn topology(&self) -> TopologyBuilder {
TopologyBuilder::new(self.clone())
}
pub fn deploy_process(&self) -> DeployProcessBuilder {
DeployProcessBuilder::new(self.clone())
}
pub fn create_process_instance(&self) -> CreateProcessInstanceBuilder {
CreateProcessInstanceBuilder::new(self.clone())
}
pub fn create_process_instance_with_result(&self) -> CreateProcessInstanceWithResultBuilder {
CreateProcessInstanceWithResultBuilder::new(self.clone())
}
pub fn cancel_process_instance(&self) -> CancelProcessInstanceBuilder {
CancelProcessInstanceBuilder::new(self.clone())
}
pub fn set_variables(&self) -> SetVariablesBuilder {
SetVariablesBuilder::new(self.clone())
}
pub fn job_worker(&self) -> JobWorkerBuilder {
JobWorkerBuilder::new(self.clone())
}
pub fn complete_job(&self) -> CompleteJobBuilder {
CompleteJobBuilder::new(self.clone())
}
pub fn fail_job(&self) -> FailJobBuilder {
FailJobBuilder::new(self.clone())
}
pub fn update_job_retries(&self) -> UpdateJobRetriesBuilder {
UpdateJobRetriesBuilder::new(self.clone())
}
pub fn throw_error(&self) -> ThrowErrorBuilder {
ThrowErrorBuilder::new(self.clone())
}
pub fn publish_message(&self) -> PublishMessageBuilder {
PublishMessageBuilder::new(self.clone())
}
pub fn resolve_incident(&self) -> ResolveIncidentBuilder {
ResolveIncidentBuilder::new(self.clone())
}
fn build_channel(endpoints: Vec<String>, tls: Option<ClientTlsConfig>) -> Result<Channel> {
let endpoints = endpoints
.into_iter()
.map(|uri| {
Channel::from_shared(uri.clone())
.map_err(|err| Error::InvalidGatewayUri {
uri,
message: err.to_string(),
})
.map(|channel| {
channel
.timeout(DEFAULT_REQUEST_TIMEOUT)
.keep_alive_timeout(DEFAULT_KEEP_ALIVE)
})
})
.map(|c| {
c.and_then(|c| match &tls {
Some(tls) => c.tls_config(tls.to_owned()).map_err(From::from),
None => Ok(c),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Channel::balance_list(endpoints.into_iter()))
}
}
#[derive(Debug)]
pub struct ClientConfig {
pub endpoints: Vec<String>,
pub tls: Option<ClientTlsConfig>,
pub auth: Option<OAuthConfig>,
}
impl ClientConfig {
pub fn from_env() -> Result<Self> {
let tls = if let Ok(ca_path) = env::var(CA_CERTIFICATE_PATH) {
let pem = fs::read_to_string(ca_path).map_err(|err| Error::Auth(err.to_string()))?;
let cert = Certificate::from_pem(pem);
Some(ClientTlsConfig::new().ca_certificate(cert))
} else {
None
};
let address = if let Ok(gateway_host) = env::var(HOST) {
if let Ok(gateway_port) = env::var(PORT) {
format!("{}:{}", gateway_host, gateway_port)
} else {
format!("{}:{}", gateway_host, DEFAULT_ADDRESS_PORT)
}
} else if let Ok(gateway_port) = env::var(PORT) {
format!("{}:{}", DEFAULT_ADDRESS_HOST, gateway_port)
} else if let Ok(gateway_address) = env::var(ADDRESS) {
gateway_address
} else {
format!("{}:{}", DEFAULT_ADDRESS_HOST, DEFAULT_ADDRESS_PORT)
};
let auth = if OAuthConfig::should_use_env_config() {
Some(OAuthConfig::from_env()?)
} else {
None
};
Ok(ClientConfig {
endpoints: vec![address],
tls,
auth,
})
}
pub fn with_endpoints(endpoints: Vec<String>) -> Self {
ClientConfig {
endpoints,
tls: None,
auth: None,
}
}
}
impl Default for ClientConfig {
fn default() -> Self {
let default_address = format!("{}:{}", DEFAULT_ADDRESS_HOST, DEFAULT_ADDRESS_PORT);
ClientConfig::with_endpoints(vec![default_address])
}
}