pub mod backoff;
#[allow(unused)]
pub mod pagination;
pub mod retry;
pub mod token;
use crate::client::retry::RetryConfig;
use crate::UnityCatalogError;
use deltalake_core::data_catalog::DataCatalogResult;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{ClientBuilder, Proxy};
use reqwest_middleware::ClientWithMiddleware;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use std::time::Duration;
fn map_client_error(e: reqwest::Error) -> super::DataCatalogError {
super::DataCatalogError::Generic {
catalog: "HTTP client",
source: Box::new(e),
}
}
static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
#[derive(Debug, Clone, Default)]
pub struct ClientOptions {
user_agent: Option<HeaderValue>,
default_headers: Option<HeaderMap>,
proxy_url: Option<String>,
allow_http: bool,
allow_insecure: bool,
timeout: Option<Duration>,
connect_timeout: Option<Duration>,
pool_idle_timeout: Option<Duration>,
pool_max_idle_per_host: Option<usize>,
http2_keep_alive_interval: Option<Duration>,
http2_keep_alive_timeout: Option<Duration>,
http2_keep_alive_while_idle: bool,
http1_only: bool,
http2_only: bool,
retry_config: Option<RetryConfig>,
}
impl ClientOptions {
pub fn new() -> Self {
Default::default()
}
pub fn with_user_agent(mut self, agent: HeaderValue) -> Self {
self.user_agent = Some(agent);
self
}
pub fn with_default_headers(mut self, headers: HeaderMap) -> Self {
self.default_headers = Some(headers);
self
}
pub fn with_allow_http(mut self, allow_http: bool) -> Self {
self.allow_http = allow_http;
self
}
pub fn with_allow_invalid_certificates(mut self, allow_insecure: bool) -> Self {
self.allow_insecure = allow_insecure;
self
}
pub fn with_http1_only(mut self) -> Self {
self.http1_only = true;
self
}
pub fn with_http2_only(mut self) -> Self {
self.http2_only = true;
self
}
pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
self.proxy_url = Some(proxy_url.into());
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = Some(timeout);
self
}
pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
self.pool_idle_timeout = Some(timeout);
self
}
pub fn with_pool_max_idle_per_host(mut self, max: usize) -> Self {
self.pool_max_idle_per_host = Some(max);
self
}
pub fn with_http2_keep_alive_interval(mut self, interval: Duration) -> Self {
self.http2_keep_alive_interval = Some(interval);
self
}
pub fn with_http2_keep_alive_timeout(mut self, interval: Duration) -> Self {
self.http2_keep_alive_timeout = Some(interval);
self
}
pub fn with_http2_keep_alive_while_idle(mut self) -> Self {
self.http2_keep_alive_while_idle = true;
self
}
pub fn with_retry_config(mut self, cfg: RetryConfig) -> Self {
self.retry_config = Some(cfg);
self
}
pub(crate) fn client(&self) -> DataCatalogResult<ClientWithMiddleware> {
let mut builder = ClientBuilder::new();
match &self.user_agent {
Some(user_agent) => builder = builder.user_agent(user_agent),
None => builder = builder.user_agent(DEFAULT_USER_AGENT),
}
if let Some(headers) = &self.default_headers {
builder = builder.default_headers(headers.clone())
}
if let Some(proxy) = &self.proxy_url {
let proxy = Proxy::all(proxy).map_err(map_client_error)?;
builder = builder.proxy(proxy);
}
if let Some(timeout) = self.timeout {
builder = builder.timeout(timeout)
}
if let Some(timeout) = self.connect_timeout {
builder = builder.connect_timeout(timeout)
}
if let Some(timeout) = self.pool_idle_timeout {
builder = builder.pool_idle_timeout(timeout)
}
if let Some(max) = self.pool_max_idle_per_host {
builder = builder.pool_max_idle_per_host(max)
}
if let Some(interval) = self.http2_keep_alive_interval {
builder = builder.http2_keep_alive_interval(interval)
}
if let Some(interval) = self.http2_keep_alive_timeout {
builder = builder.http2_keep_alive_timeout(interval)
}
if self.http2_keep_alive_while_idle {
builder = builder.http2_keep_alive_while_idle(true)
}
if self.http1_only {
builder = builder.http1_only()
}
if self.http2_only {
builder = builder.http2_prior_knowledge()
}
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(self.allow_insecure)
}
let inner_client = builder
.https_only(!self.allow_http)
.build()
.map_err(UnityCatalogError::from)?;
let retry_policy = self
.retry_config
.as_ref()
.map(|retry| retry.into())
.unwrap_or(ExponentialBackoff::builder().build_with_max_retries(3));
let middleware = RetryTransientMiddleware::new_with_policy(retry_policy);
Ok(reqwest_middleware::ClientBuilder::new(inner_client)
.with(middleware)
.build())
}
}