Skip to main content

deltalake_catalog_unity/client/
mod.rs

1//! Generic utilities reqwest based Catalog implementations
2
3pub mod backoff;
4#[allow(unused)]
5pub mod pagination;
6pub mod retry;
7pub mod token;
8
9use crate::UnityCatalogError;
10use crate::client::retry::RetryConfig;
11use deltalake_core::data_catalog::DataCatalogResult;
12use reqwest::header::{HeaderMap, HeaderValue};
13use reqwest::{ClientBuilder, Proxy};
14use reqwest_middleware::ClientWithMiddleware;
15use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
16use std::time::Duration;
17use typed_builder::TypedBuilder;
18
19fn map_client_error(e: reqwest::Error) -> super::DataCatalogError {
20    super::DataCatalogError::Generic {
21        catalog: "HTTP client",
22        source: Box::new(e),
23    }
24}
25
26static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
27
28/// HTTP client configuration for remote catalogs
29#[derive(Debug, Clone, Default, TypedBuilder)]
30#[builder(doc)]
31pub struct ClientOptions {
32    /// User-Agent header to use for requests
33    #[builder(default, setter(strip_option))]
34    user_agent: Option<HeaderValue>,
35    /// Default headers for every request
36    #[builder(default, setter(strip_option))]
37    default_headers: Option<HeaderMap>,
38    /// HTTP proxy URL
39    #[builder(default, setter(strip_option, into))]
40    proxy_url: Option<String>,
41    /// Allow HTTP connections (default: false)
42    #[builder(default)]
43    pub(crate) allow_http: bool,
44    /// Allow invalid SSL certificates (default: false)
45    #[builder(default)]
46    allow_insecure: bool,
47    /// Request timeout
48    #[builder(default, setter(strip_option))]
49    timeout: Option<Duration>,
50    /// Connect timeout
51    #[builder(default, setter(strip_option))]
52    connect_timeout: Option<Duration>,
53    /// Pool idle timeout
54    #[builder(default, setter(strip_option))]
55    pool_idle_timeout: Option<Duration>,
56    /// Maximum number of idle connections per host
57    #[builder(default, setter(strip_option))]
58    pool_max_idle_per_host: Option<usize>,
59    /// HTTP2 keep alive interval
60    #[builder(default, setter(strip_option))]
61    http2_keep_alive_interval: Option<Duration>,
62    /// HTTP2 keep alive timeout
63    #[builder(default, setter(strip_option))]
64    http2_keep_alive_timeout: Option<Duration>,
65    /// Enable HTTP2 keep alive while idle
66    #[builder(default)]
67    http2_keep_alive_while_idle: bool,
68    /// Only use HTTP1
69    #[builder(default)]
70    http1_only: bool,
71    /// Only use HTTP2
72    #[builder(default)]
73    http2_only: bool,
74    /// Retry configuration
75    #[builder(default, setter(strip_option))]
76    retry_config: Option<RetryConfig>,
77}
78
79impl ClientOptions {
80    pub(crate) fn client(&self) -> DataCatalogResult<ClientWithMiddleware> {
81        let mut builder = ClientBuilder::new();
82
83        match &self.user_agent {
84            Some(user_agent) => builder = builder.user_agent(user_agent),
85            None => builder = builder.user_agent(DEFAULT_USER_AGENT),
86        }
87
88        if let Some(headers) = &self.default_headers {
89            builder = builder.default_headers(headers.clone())
90        }
91
92        if let Some(proxy) = &self.proxy_url {
93            let proxy = Proxy::all(proxy).map_err(map_client_error)?;
94            builder = builder.proxy(proxy);
95        }
96
97        if let Some(timeout) = self.timeout {
98            builder = builder.timeout(timeout)
99        }
100
101        if let Some(timeout) = self.connect_timeout {
102            builder = builder.connect_timeout(timeout)
103        }
104
105        if let Some(timeout) = self.pool_idle_timeout {
106            builder = builder.pool_idle_timeout(timeout)
107        }
108
109        if let Some(max) = self.pool_max_idle_per_host {
110            builder = builder.pool_max_idle_per_host(max)
111        }
112
113        if let Some(interval) = self.http2_keep_alive_interval {
114            builder = builder.http2_keep_alive_interval(interval)
115        }
116
117        if let Some(interval) = self.http2_keep_alive_timeout {
118            builder = builder.http2_keep_alive_timeout(interval)
119        }
120
121        if self.http2_keep_alive_while_idle {
122            builder = builder.http2_keep_alive_while_idle(true)
123        }
124
125        if self.http1_only {
126            builder = builder.http1_only()
127        }
128
129        if self.http2_only {
130            builder = builder.http2_prior_knowledge()
131        }
132
133        if self.allow_insecure {
134            builder = builder.danger_accept_invalid_certs(self.allow_insecure)
135        }
136
137        let inner_client = builder
138            .https_only(!self.allow_http)
139            .build()
140            .map_err(UnityCatalogError::from)?;
141        let retry_policy = self
142            .retry_config
143            .as_ref()
144            .map(|retry| retry.into())
145            .unwrap_or(ExponentialBackoff::builder().build_with_max_retries(3));
146
147        let middleware = RetryTransientMiddleware::new_with_policy(retry_policy);
148        Ok(reqwest_middleware::ClientBuilder::new(inner_client)
149            .with(middleware)
150            .build())
151    }
152}