1use crate::blocking;
16use crate::client::ClientState;
17use crate::config::{ProxyConfig, SecurityConfig, ServiceConfig};
18use crate::raw::{BuildRawClient, DefaultRawClientBuilder};
19use crate::weak_cache::Cached;
20use crate::{Client, HostMetricsRegistry, UserAgent};
21use arc_swap::ArcSwap;
22use conjure_error::Error;
23use std::borrow::Cow;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::runtime::Handle;
27use url::Url;
28use witchcraft_metrics::MetricRegistry;
29
30const MESH_PREFIX: &str = "mesh-";
31
32pub struct Builder<T = Complete>(T);
34
35pub struct ServiceStage(());
37
38pub struct UserAgentStage {
40 service: String,
41}
42
43#[derive(Clone, PartialEq, Eq, Hash)]
44pub(crate) struct CachedConfig {
45 service: String,
46 user_agent: UserAgent,
47 uris: Vec<Url>,
48 security: SecurityConfig,
49 proxy: ProxyConfig,
50 connect_timeout: Duration,
51 read_timeout: Duration,
52 write_timeout: Duration,
53 backoff_slot_size: Duration,
54 max_num_retries: u32,
55 client_qos: ClientQos,
56 server_qos: ServerQos,
57 service_error: ServiceError,
58 idempotency: Idempotency,
59 node_selection_strategy: NodeSelectionStrategy,
60 rng_seed: Option<u64>,
61 override_host_index: Option<usize>,
62}
63
64#[derive(Clone)]
65pub(crate) struct UncachedConfig<T> {
66 pub(crate) metrics: Option<Arc<MetricRegistry>>,
67 pub(crate) host_metrics: Option<Arc<HostMetricsRegistry>>,
68 pub(crate) blocking_handle: Option<Handle>,
69 pub(crate) raw_client_builder: T,
70}
71
72pub struct Complete<T = DefaultRawClientBuilder> {
74 cached: CachedConfig,
75 uncached: UncachedConfig<T>,
76}
77
78impl Default for Builder<ServiceStage> {
79 #[inline]
80 fn default() -> Self {
81 Builder::new()
82 }
83}
84
85impl Builder<ServiceStage> {
86 #[inline]
88 pub fn new() -> Self {
89 Builder(ServiceStage(()))
90 }
91
92 #[inline]
96 pub fn service(self, service: &str) -> Builder<UserAgentStage> {
97 Builder(UserAgentStage {
98 service: service.to_string(),
99 })
100 }
101}
102
103impl Builder<UserAgentStage> {
104 #[inline]
106 pub fn user_agent(self, user_agent: UserAgent) -> Builder {
107 Builder(Complete {
108 cached: CachedConfig {
109 service: self.0.service,
110 user_agent,
111 uris: vec![],
112 security: SecurityConfig::builder().build(),
113 proxy: ProxyConfig::Direct,
114 connect_timeout: Duration::from_secs(10),
115 read_timeout: Duration::from_secs(5 * 60),
116 write_timeout: Duration::from_secs(5 * 60),
117 backoff_slot_size: Duration::from_millis(250),
118 max_num_retries: 4,
119 client_qos: ClientQos::Enabled,
120 server_qos: ServerQos::AutomaticRetry,
121 service_error: ServiceError::WrapInNewError,
122 idempotency: Idempotency::ByMethod,
123 node_selection_strategy: NodeSelectionStrategy::PinUntilError,
124 rng_seed: None,
125 override_host_index: None,
126 },
127 uncached: UncachedConfig {
128 metrics: None,
129 host_metrics: None,
130 blocking_handle: None,
131 raw_client_builder: DefaultRawClientBuilder,
132 },
133 })
134 }
135}
136
137#[cfg(test)]
138impl Builder {
139 pub(crate) fn for_test() -> Self {
140 use crate::Agent;
141
142 Builder::new()
143 .service("test")
144 .user_agent(UserAgent::new(Agent::new("test", "0.0.0")))
145 }
146}
147
148impl<T> Builder<Complete<T>> {
149 pub(crate) fn cached_config(&self) -> &CachedConfig {
150 &self.0.cached
151 }
152
153 #[inline]
155 pub fn from_config(mut self, config: &ServiceConfig) -> Self {
156 self = self.uris(config.uris().to_vec());
157
158 if let Some(security) = config.security() {
159 self = self.security(security.clone());
160 }
161
162 if let Some(proxy) = config.proxy() {
163 self = self.proxy(proxy.clone());
164 }
165
166 if let Some(connect_timeout) = config.connect_timeout() {
167 self = self.connect_timeout(connect_timeout);
168 }
169
170 if let Some(read_timeout) = config.read_timeout() {
171 self = self.read_timeout(read_timeout);
172 }
173
174 if let Some(write_timeout) = config.write_timeout() {
175 self = self.write_timeout(write_timeout);
176 }
177
178 if let Some(backoff_slot_size) = config.backoff_slot_size() {
179 self = self.backoff_slot_size(backoff_slot_size);
180 }
181
182 if let Some(max_num_retries) = config.max_num_retries() {
183 self = self.max_num_retries(max_num_retries);
184 }
185
186 self
187 }
188
189 #[inline]
191 pub fn get_service(&self) -> &str {
192 &self.0.cached.service
193 }
194
195 #[inline]
197 pub fn get_user_agent(&self) -> &UserAgent {
198 &self.0.cached.user_agent
199 }
200
201 #[inline]
205 pub fn uri(mut self, uri: Url) -> Self {
206 self.0.cached.uris.push(uri);
207 self
208 }
209
210 #[inline]
214 pub fn uris(mut self, uris: Vec<Url>) -> Self {
215 self.0.cached.uris = uris;
216 self
217 }
218
219 #[inline]
221 pub fn get_uris(&self) -> &[Url] {
222 &self.0.cached.uris
223 }
224
225 #[inline]
229 pub fn security(mut self, security: SecurityConfig) -> Self {
230 self.0.cached.security = security;
231 self
232 }
233
234 #[inline]
236 pub fn get_security(&self) -> &SecurityConfig {
237 &self.0.cached.security
238 }
239
240 #[inline]
244 pub fn proxy(mut self, proxy: ProxyConfig) -> Self {
245 self.0.cached.proxy = proxy;
246 self
247 }
248
249 #[inline]
251 pub fn get_proxy(&self) -> &ProxyConfig {
252 &self.0.cached.proxy
253 }
254
255 #[inline]
259 pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
260 self.0.cached.connect_timeout = connect_timeout;
261 self
262 }
263
264 #[inline]
266 pub fn get_connect_timeout(&self) -> Duration {
267 self.0.cached.connect_timeout
268 }
269
270 #[inline]
276 pub fn read_timeout(mut self, read_timeout: Duration) -> Self {
277 self.0.cached.read_timeout = read_timeout;
278 self
279 }
280
281 #[inline]
283 pub fn get_read_timeout(&self) -> Duration {
284 self.0.cached.read_timeout
285 }
286
287 #[inline]
293 pub fn write_timeout(mut self, write_timeout: Duration) -> Self {
294 self.0.cached.write_timeout = write_timeout;
295 self
296 }
297
298 #[inline]
300 pub fn get_write_timeout(&self) -> Duration {
301 self.0.cached.write_timeout
302 }
303
304 #[inline]
311 pub fn backoff_slot_size(mut self, backoff_slot_size: Duration) -> Self {
312 self.0.cached.backoff_slot_size = backoff_slot_size;
313 self
314 }
315
316 #[inline]
318 pub fn get_backoff_slot_size(&self) -> Duration {
319 self.0.cached.backoff_slot_size
320 }
321
322 #[inline]
326 pub fn max_num_retries(mut self, max_num_retries: u32) -> Self {
327 self.0.cached.max_num_retries = max_num_retries;
328 self
329 }
330
331 #[inline]
333 pub fn get_max_num_retries(&self) -> u32 {
334 self.0.cached.max_num_retries
335 }
336
337 #[inline]
341 pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
342 self.0.cached.client_qos = client_qos;
343 self
344 }
345
346 #[inline]
348 pub fn get_client_qos(&self) -> ClientQos {
349 self.0.cached.client_qos
350 }
351
352 #[inline]
356 pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
357 self.0.cached.server_qos = server_qos;
358 self
359 }
360
361 #[inline]
363 pub fn get_server_qos(&self) -> ServerQos {
364 self.0.cached.server_qos
365 }
366
367 #[inline]
371 pub fn service_error(mut self, service_error: ServiceError) -> Self {
372 self.0.cached.service_error = service_error;
373 self
374 }
375
376 #[inline]
378 pub fn get_service_error(&self) -> ServiceError {
379 self.0.cached.service_error
380 }
381
382 #[inline]
388 pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
389 self.0.cached.idempotency = idempotency;
390 self
391 }
392
393 #[inline]
395 pub fn get_idempotency(&self) -> Idempotency {
396 self.0.cached.idempotency
397 }
398
399 #[inline]
403 pub fn node_selection_strategy(
404 mut self,
405 node_selection_strategy: NodeSelectionStrategy,
406 ) -> Self {
407 self.0.cached.node_selection_strategy = node_selection_strategy;
408 self
409 }
410
411 #[inline]
413 pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
414 self.0.cached.node_selection_strategy
415 }
416
417 #[inline]
421 pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
422 self.0.uncached.metrics = Some(metrics);
423 self
424 }
425
426 #[inline]
428 pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
429 self.0.uncached.metrics.as_ref()
430 }
431
432 #[inline]
436 pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
437 self.0.uncached.host_metrics = Some(host_metrics);
438 self
439 }
440
441 #[inline]
443 pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
444 self.0.uncached.host_metrics.as_ref()
445 }
446
447 #[inline]
455 pub fn rng_seed(mut self, rng_seed: u64) -> Self {
456 self.0.cached.rng_seed = Some(rng_seed);
457 self
458 }
459
460 #[inline]
462 pub fn get_rng_seed(&self) -> Option<u64> {
463 self.0.cached.rng_seed
464 }
465
466 #[inline]
472 pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
473 self.0.uncached.blocking_handle = Some(blocking_handle);
474 self
475 }
476
477 #[inline]
479 pub fn get_blocking_handle(&self) -> Option<&Handle> {
480 self.0.uncached.blocking_handle.as_ref()
481 }
482
483 #[inline]
485 pub fn override_host_index(mut self, override_host_index: usize) -> Self {
486 self.0.cached.override_host_index = Some(override_host_index);
487 self
488 }
489
490 #[inline]
492 pub fn get_override_host_index(&self) -> Option<usize> {
493 self.0.cached.override_host_index
494 }
495
496 #[inline]
500 pub fn raw_client_builder<U>(self, raw_client_builder: U) -> Builder<Complete<U>> {
501 Builder(Complete {
502 cached: self.0.cached,
503 uncached: UncachedConfig {
504 metrics: self.0.uncached.metrics,
505 host_metrics: self.0.uncached.host_metrics,
506 blocking_handle: self.0.uncached.blocking_handle,
507 raw_client_builder,
508 },
509 })
510 }
511
512 #[inline]
514 pub fn get_raw_client_builder(&self) -> &T {
515 &self.0.uncached.raw_client_builder
516 }
517
518 pub(crate) fn mesh_mode(&self) -> bool {
519 self.0
520 .cached
521 .uris
522 .iter()
523 .any(|uri| uri.scheme().starts_with(MESH_PREFIX))
524 }
525
526 pub(crate) fn postprocessed_uris(&self) -> Result<Cow<'_, [Url]>, Error> {
527 if self.mesh_mode() {
528 if self.0.cached.uris.len() != 1 {
529 return Err(Error::internal_safe("mesh mode expects exactly one URI")
530 .with_safe_param("uris", &self.0.cached.uris));
531 }
532
533 let uri = self.0.cached.uris[0]
534 .as_str()
535 .strip_prefix(MESH_PREFIX)
536 .unwrap()
537 .parse()
538 .unwrap();
539
540 Ok(Cow::Owned(vec![uri]))
541 } else {
542 Ok(Cow::Borrowed(&self.0.cached.uris))
543 }
544 }
545}
546
547impl<T> Builder<Complete<T>>
548where
549 T: BuildRawClient,
550{
551 pub fn build(&self) -> Result<Client<T::RawClient>, Error> {
553 let state = ClientState::new(self)?;
554 Ok(Client::new(
555 Arc::new(ArcSwap::new(Arc::new(Cached::uncached(state)))),
556 None,
557 ))
558 }
559
560 pub fn build_blocking(&self) -> Result<blocking::Client<T::RawClient>, Error> {
562 self.build().map(|client| blocking::Client {
563 client,
564 handle: self.0.uncached.blocking_handle.clone(),
565 })
566 }
567}
568
569#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
571#[non_exhaustive]
572pub enum ClientQos {
573 Enabled,
577
578 DangerousDisableSympatheticClientQos,
583}
584
585#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
589#[non_exhaustive]
590pub enum ServerQos {
591 AutomaticRetry,
595
596 Propagate429And503ToCaller,
601}
602
603#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
607#[non_exhaustive]
608pub enum ServiceError {
609 WrapInNewError,
616
617 PropagateToCaller,
622}
623
624#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
626#[non_exhaustive]
627pub enum Idempotency {
628 Always,
630
631 ByMethod,
636
637 Never,
639}
640
641#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
643#[non_exhaustive]
644pub enum NodeSelectionStrategy {
645 PinUntilError,
653
654 PinUntilErrorWithoutReshuffle,
656
657 Balanced,
659}