#[cfg(not(target_arch = "wasm32"))]
use crate::blocking;
use crate::client::ClientState;
use crate::config::{ProxyConfig, SecurityConfig, ServiceConfig};
use crate::weak_cache::Cached;
use crate::{Client, HostMetricsRegistry, UserAgent};
use arc_swap::ArcSwap;
use conjure_error::Error;
use conjure_http::client::ConjureRuntime;
use std::borrow::Cow;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use tokio::runtime::Handle;
use url::Url;
use witchcraft_metrics::MetricRegistry;
const MESH_PREFIX: &str = "mesh-";
pub struct Builder<T = Complete>(T);
pub struct ServiceStage(());
pub struct UserAgentStage {
service: String,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub(crate) struct CachedConfig {
service: String,
user_agent: UserAgent,
uris: Vec<Url>,
security: SecurityConfig,
proxy: ProxyConfig,
connect_timeout: Duration,
read_timeout: Duration,
write_timeout: Duration,
backoff_slot_size: Duration,
max_num_retries: u32,
client_qos: ClientQos,
server_qos: ServerQos,
service_error: ServiceError,
idempotency: Idempotency,
node_selection_strategy: NodeSelectionStrategy,
override_host_index: Option<usize>,
}
#[derive(Clone)]
pub(crate) struct UncachedConfig {
pub(crate) metrics: Option<Arc<MetricRegistry>>,
pub(crate) host_metrics: Option<Arc<HostMetricsRegistry>>,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) blocking_handle: Option<Handle>,
pub(crate) conjure_runtime: Arc<ConjureRuntime>,
}
pub struct Complete {
cached: CachedConfig,
uncached: UncachedConfig,
}
impl Default for Builder<ServiceStage> {
#[inline]
fn default() -> Self {
Builder::new()
}
}
impl Builder<ServiceStage> {
#[inline]
pub fn new() -> Self {
Builder(ServiceStage(()))
}
#[inline]
pub fn service(self, service: &str) -> Builder<UserAgentStage> {
Builder(UserAgentStage {
service: service.to_string(),
})
}
}
impl Builder<UserAgentStage> {
#[inline]
pub fn user_agent(self, user_agent: UserAgent) -> Builder {
Builder(Complete {
cached: CachedConfig {
service: self.0.service,
user_agent,
uris: vec![],
security: SecurityConfig::builder().build(),
proxy: ProxyConfig::Direct,
connect_timeout: Duration::from_secs(10),
read_timeout: Duration::from_secs(5 * 60),
write_timeout: Duration::from_secs(5 * 60),
backoff_slot_size: Duration::from_millis(250),
max_num_retries: 4,
client_qos: ClientQos::Enabled,
server_qos: ServerQos::AutomaticRetry,
service_error: ServiceError::WrapInNewError,
idempotency: Idempotency::ByMethod,
node_selection_strategy: NodeSelectionStrategy::PinUntilError,
override_host_index: None,
},
uncached: UncachedConfig {
metrics: None,
host_metrics: None,
#[cfg(not(target_arch = "wasm32"))]
blocking_handle: None,
conjure_runtime: Arc::new(ConjureRuntime::new()),
},
})
}
}
#[cfg(test)]
impl Builder {
pub(crate) fn for_test() -> Self {
use crate::Agent;
Builder::new()
.service("test")
.user_agent(UserAgent::new(Agent::new("test", "0.0.0")))
}
}
impl Builder<Complete> {
pub(crate) fn cached_config(&self) -> &CachedConfig {
&self.0.cached
}
#[inline]
pub fn from_config(mut self, config: &ServiceConfig) -> Self {
self = self.uris(config.uris().to_vec());
if let Some(security) = config.security() {
self = self.security(security.clone());
}
if let Some(proxy) = config.proxy() {
self = self.proxy(proxy.clone());
}
if let Some(connect_timeout) = config.connect_timeout() {
self = self.connect_timeout(connect_timeout);
}
if let Some(read_timeout) = config.read_timeout() {
self = self.read_timeout(read_timeout);
}
if let Some(write_timeout) = config.write_timeout() {
self = self.write_timeout(write_timeout);
}
if let Some(backoff_slot_size) = config.backoff_slot_size() {
self = self.backoff_slot_size(backoff_slot_size);
}
if let Some(max_num_retries) = config.max_num_retries() {
self = self.max_num_retries(max_num_retries);
}
self
}
#[inline]
pub fn get_service(&self) -> &str {
&self.0.cached.service
}
#[inline]
pub fn get_user_agent(&self) -> &UserAgent {
&self.0.cached.user_agent
}
#[inline]
pub fn uri(mut self, uri: Url) -> Self {
self.0.cached.uris.push(uri);
self
}
#[inline]
pub fn uris(mut self, uris: Vec<Url>) -> Self {
self.0.cached.uris = uris;
self
}
#[inline]
pub fn get_uris(&self) -> &[Url] {
&self.0.cached.uris
}
#[inline]
pub fn security(mut self, security: SecurityConfig) -> Self {
self.0.cached.security = security;
self
}
#[inline]
pub fn get_security(&self) -> &SecurityConfig {
&self.0.cached.security
}
#[inline]
pub fn proxy(mut self, proxy: ProxyConfig) -> Self {
self.0.cached.proxy = proxy;
self
}
#[inline]
pub fn get_proxy(&self) -> &ProxyConfig {
&self.0.cached.proxy
}
#[inline]
pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
self.0.cached.connect_timeout = connect_timeout;
self
}
#[inline]
pub fn get_connect_timeout(&self) -> Duration {
self.0.cached.connect_timeout
}
#[inline]
pub fn read_timeout(mut self, read_timeout: Duration) -> Self {
self.0.cached.read_timeout = read_timeout;
self
}
#[inline]
pub fn get_read_timeout(&self) -> Duration {
self.0.cached.read_timeout
}
#[inline]
pub fn write_timeout(mut self, write_timeout: Duration) -> Self {
self.0.cached.write_timeout = write_timeout;
self
}
#[inline]
pub fn get_write_timeout(&self) -> Duration {
self.0.cached.write_timeout
}
#[inline]
pub fn backoff_slot_size(mut self, backoff_slot_size: Duration) -> Self {
self.0.cached.backoff_slot_size = backoff_slot_size;
self
}
#[inline]
pub fn get_backoff_slot_size(&self) -> Duration {
self.0.cached.backoff_slot_size
}
#[inline]
pub fn max_num_retries(mut self, max_num_retries: u32) -> Self {
self.0.cached.max_num_retries = max_num_retries;
self
}
#[inline]
pub fn get_max_num_retries(&self) -> u32 {
self.0.cached.max_num_retries
}
#[inline]
pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
self.0.cached.client_qos = client_qos;
self
}
#[inline]
pub fn get_client_qos(&self) -> ClientQos {
self.0.cached.client_qos
}
#[inline]
pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
self.0.cached.server_qos = server_qos;
self
}
#[inline]
pub fn get_server_qos(&self) -> ServerQos {
self.0.cached.server_qos
}
#[inline]
pub fn service_error(mut self, service_error: ServiceError) -> Self {
self.0.cached.service_error = service_error;
self
}
#[inline]
pub fn get_service_error(&self) -> ServiceError {
self.0.cached.service_error
}
#[inline]
pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
self.0.cached.idempotency = idempotency;
self
}
#[inline]
pub fn get_idempotency(&self) -> Idempotency {
self.0.cached.idempotency
}
#[inline]
pub fn node_selection_strategy(
mut self,
node_selection_strategy: NodeSelectionStrategy,
) -> Self {
self.0.cached.node_selection_strategy = node_selection_strategy;
self
}
#[inline]
pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
self.0.cached.node_selection_strategy
}
#[inline]
pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
self.0.uncached.metrics = Some(metrics);
self
}
#[inline]
pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
self.0.uncached.metrics.as_ref()
}
#[inline]
pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
self.0.uncached.host_metrics = Some(host_metrics);
self
}
#[inline]
pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
self.0.uncached.host_metrics.as_ref()
}
#[inline]
pub fn conjure_runtime(mut self, conjure_runtime: Arc<ConjureRuntime>) -> Self {
self.0.uncached.conjure_runtime = conjure_runtime;
self
}
pub fn get_conjure_runtime(&self) -> &Arc<ConjureRuntime> {
&self.0.uncached.conjure_runtime
}
#[inline]
pub fn override_host_index(mut self, override_host_index: usize) -> Self {
self.0.cached.override_host_index = Some(override_host_index);
self
}
#[inline]
pub fn get_override_host_index(&self) -> Option<usize> {
self.0.cached.override_host_index
}
pub(crate) fn mesh_mode(&self) -> bool {
self.0
.cached
.uris
.iter()
.any(|uri| uri.scheme().starts_with(MESH_PREFIX))
}
pub(crate) fn postprocessed_uris(&self) -> Result<Cow<'_, [Url]>, Error> {
if self.mesh_mode() {
if self.0.cached.uris.len() != 1 {
return Err(Error::internal_safe("mesh mode expects exactly one URI")
.with_safe_param("uris", &self.0.cached.uris));
}
let uri = self.0.cached.uris[0]
.as_str()
.strip_prefix(MESH_PREFIX)
.unwrap()
.parse()
.unwrap();
Ok(Cow::Owned(vec![uri]))
} else {
Ok(Cow::Borrowed(&self.0.cached.uris))
}
}
pub fn build(&self) -> Result<Client, Error> {
let state = ClientState::new(self)?;
Ok(Client::new(
Arc::new(ArcSwap::new(Arc::new(Cached::uncached(state)))),
None,
))
}
}
#[cfg(not(target_arch = "wasm32"))]
impl Builder<Complete> {
#[inline]
pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
self.0.uncached.blocking_handle = Some(blocking_handle);
self
}
#[inline]
pub fn get_blocking_handle(&self) -> Option<&Handle> {
self.0.uncached.blocking_handle.as_ref()
}
pub fn build_blocking(&self) -> Result<blocking::Client, Error> {
self.build().map(|client| blocking::Client {
client,
handle: self.0.uncached.blocking_handle.clone(),
})
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ClientQos {
Enabled,
DangerousDisableSympatheticClientQos,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ServerQos {
AutomaticRetry,
Propagate429And503ToCaller,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ServiceError {
WrapInNewError,
PropagateToCaller,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum Idempotency {
Always,
ByMethod,
Never,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum NodeSelectionStrategy {
PinUntilError,
PinUntilErrorWithoutReshuffle,
Balanced,
}