conjure_runtime/
builder.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! The client builder.
15use crate::blocking;
16use crate::client::ClientState;
17use crate::config::{ProxyConfig, SecurityConfig, ServiceConfig};
18use crate::raw::{BuildRawClient, DefaultRawClientBuilder};
19use crate::weak_cache::Cached;
20use crate::{Client, HostMetricsRegistry, UserAgent};
21use arc_swap::ArcSwap;
22use conjure_error::Error;
23use std::borrow::Cow;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::runtime::Handle;
27use url::Url;
28use witchcraft_metrics::MetricRegistry;
29
30const MESH_PREFIX: &str = "mesh-";
31
32/// A builder to construct [`Client`]s and [`blocking::Client`]s.
33pub struct Builder<T = Complete>(T);
34
35/// The service builder stage.
36pub struct ServiceStage(());
37
38/// The user agent builder stage.
39pub struct UserAgentStage {
40    service: String,
41}
42
43#[derive(Clone, PartialEq, Eq, Hash)]
44pub(crate) struct CachedConfig {
45    service: String,
46    user_agent: UserAgent,
47    uris: Vec<Url>,
48    security: SecurityConfig,
49    proxy: ProxyConfig,
50    connect_timeout: Duration,
51    read_timeout: Duration,
52    write_timeout: Duration,
53    backoff_slot_size: Duration,
54    max_num_retries: u32,
55    client_qos: ClientQos,
56    server_qos: ServerQos,
57    service_error: ServiceError,
58    idempotency: Idempotency,
59    node_selection_strategy: NodeSelectionStrategy,
60    rng_seed: Option<u64>,
61    override_host_index: Option<usize>,
62}
63
64#[derive(Clone)]
65pub(crate) struct UncachedConfig<T> {
66    pub(crate) metrics: Option<Arc<MetricRegistry>>,
67    pub(crate) host_metrics: Option<Arc<HostMetricsRegistry>>,
68    pub(crate) blocking_handle: Option<Handle>,
69    pub(crate) raw_client_builder: T,
70}
71
72/// The complete builder stage.
73pub struct Complete<T = DefaultRawClientBuilder> {
74    cached: CachedConfig,
75    uncached: UncachedConfig<T>,
76}
77
78impl Default for Builder<ServiceStage> {
79    #[inline]
80    fn default() -> Self {
81        Builder::new()
82    }
83}
84
85impl Builder<ServiceStage> {
86    /// Creates a new builder with default settings.
87    #[inline]
88    pub fn new() -> Self {
89        Builder(ServiceStage(()))
90    }
91
92    /// Sets the name of the service this client will communicate with.
93    ///
94    /// This is used in logging and metrics to allow differentiation between different clients.
95    #[inline]
96    pub fn service(self, service: &str) -> Builder<UserAgentStage> {
97        Builder(UserAgentStage {
98            service: service.to_string(),
99        })
100    }
101}
102
103impl Builder<UserAgentStage> {
104    /// Sets the user agent sent by this client.
105    #[inline]
106    pub fn user_agent(self, user_agent: UserAgent) -> Builder {
107        Builder(Complete {
108            cached: CachedConfig {
109                service: self.0.service,
110                user_agent,
111                uris: vec![],
112                security: SecurityConfig::builder().build(),
113                proxy: ProxyConfig::Direct,
114                connect_timeout: Duration::from_secs(10),
115                read_timeout: Duration::from_secs(5 * 60),
116                write_timeout: Duration::from_secs(5 * 60),
117                backoff_slot_size: Duration::from_millis(250),
118                max_num_retries: 4,
119                client_qos: ClientQos::Enabled,
120                server_qos: ServerQos::AutomaticRetry,
121                service_error: ServiceError::WrapInNewError,
122                idempotency: Idempotency::ByMethod,
123                node_selection_strategy: NodeSelectionStrategy::PinUntilError,
124                rng_seed: None,
125                override_host_index: None,
126            },
127            uncached: UncachedConfig {
128                metrics: None,
129                host_metrics: None,
130                blocking_handle: None,
131                raw_client_builder: DefaultRawClientBuilder,
132            },
133        })
134    }
135}
136
137#[cfg(test)]
138impl Builder {
139    pub(crate) fn for_test() -> Self {
140        use crate::Agent;
141
142        Builder::new()
143            .service("test")
144            .user_agent(UserAgent::new(Agent::new("test", "0.0.0")))
145    }
146}
147
148impl<T> Builder<Complete<T>> {
149    pub(crate) fn cached_config(&self) -> &CachedConfig {
150        &self.0.cached
151    }
152
153    /// Applies configuration settings from a `ServiceConfig` to the builder.
154    #[inline]
155    pub fn from_config(mut self, config: &ServiceConfig) -> Self {
156        self = self.uris(config.uris().to_vec());
157
158        if let Some(security) = config.security() {
159            self = self.security(security.clone());
160        }
161
162        if let Some(proxy) = config.proxy() {
163            self = self.proxy(proxy.clone());
164        }
165
166        if let Some(connect_timeout) = config.connect_timeout() {
167            self = self.connect_timeout(connect_timeout);
168        }
169
170        if let Some(read_timeout) = config.read_timeout() {
171            self = self.read_timeout(read_timeout);
172        }
173
174        if let Some(write_timeout) = config.write_timeout() {
175            self = self.write_timeout(write_timeout);
176        }
177
178        if let Some(backoff_slot_size) = config.backoff_slot_size() {
179            self = self.backoff_slot_size(backoff_slot_size);
180        }
181
182        if let Some(max_num_retries) = config.max_num_retries() {
183            self = self.max_num_retries(max_num_retries);
184        }
185
186        self
187    }
188
189    /// Returns the builder's configured service name.
190    #[inline]
191    pub fn get_service(&self) -> &str {
192        &self.0.cached.service
193    }
194
195    /// Returns the builder's configured user agent.
196    #[inline]
197    pub fn get_user_agent(&self) -> &UserAgent {
198        &self.0.cached.user_agent
199    }
200
201    /// Appends a URI to the URIs list.
202    ///
203    /// Defaults to an empty list.
204    #[inline]
205    pub fn uri(mut self, uri: Url) -> Self {
206        self.0.cached.uris.push(uri);
207        self
208    }
209
210    /// Sets the URIs list.
211    ///
212    /// Defaults to an empty list.
213    #[inline]
214    pub fn uris(mut self, uris: Vec<Url>) -> Self {
215        self.0.cached.uris = uris;
216        self
217    }
218
219    /// Returns the builder's configured URIs list.
220    #[inline]
221    pub fn get_uris(&self) -> &[Url] {
222        &self.0.cached.uris
223    }
224
225    /// Sets the security configuration.
226    ///
227    /// Defaults to an empty configuration.
228    #[inline]
229    pub fn security(mut self, security: SecurityConfig) -> Self {
230        self.0.cached.security = security;
231        self
232    }
233
234    /// Returns the builder's configured security configuration.
235    #[inline]
236    pub fn get_security(&self) -> &SecurityConfig {
237        &self.0.cached.security
238    }
239
240    /// Sets the proxy configuration.
241    ///
242    /// Defaults to `ProxyConfig::Direct` (i.e. no proxy).
243    #[inline]
244    pub fn proxy(mut self, proxy: ProxyConfig) -> Self {
245        self.0.cached.proxy = proxy;
246        self
247    }
248
249    /// Returns the builder's configured proxy configuration.
250    #[inline]
251    pub fn get_proxy(&self) -> &ProxyConfig {
252        &self.0.cached.proxy
253    }
254
255    /// Sets the connect timeout.
256    ///
257    /// Defaults to 10 seconds.
258    #[inline]
259    pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
260        self.0.cached.connect_timeout = connect_timeout;
261        self
262    }
263
264    /// Returns the builder's configured connect timeout.
265    #[inline]
266    pub fn get_connect_timeout(&self) -> Duration {
267        self.0.cached.connect_timeout
268    }
269
270    /// Sets the read timeout.
271    ///
272    /// This timeout applies to socket-level read attempts.
273    ///
274    /// Defaults to 5 minutes.
275    #[inline]
276    pub fn read_timeout(mut self, read_timeout: Duration) -> Self {
277        self.0.cached.read_timeout = read_timeout;
278        self
279    }
280
281    /// Returns the builder's configured read timeout.
282    #[inline]
283    pub fn get_read_timeout(&self) -> Duration {
284        self.0.cached.read_timeout
285    }
286
287    /// Sets the write timeout.
288    ///
289    /// This timeout applies to socket-level write attempts.
290    ///
291    /// Defaults to 5 minutes.
292    #[inline]
293    pub fn write_timeout(mut self, write_timeout: Duration) -> Self {
294        self.0.cached.write_timeout = write_timeout;
295        self
296    }
297
298    /// Returns the builder's configured write timeout.
299    #[inline]
300    pub fn get_write_timeout(&self) -> Duration {
301        self.0.cached.write_timeout
302    }
303
304    /// Sets the backoff slot size.
305    ///
306    /// This is the upper bound on the initial delay before retrying a request. It grows exponentially as additional
307    /// attempts are made for a given request.
308    ///
309    /// Defaults to 250 milliseconds.
310    #[inline]
311    pub fn backoff_slot_size(mut self, backoff_slot_size: Duration) -> Self {
312        self.0.cached.backoff_slot_size = backoff_slot_size;
313        self
314    }
315
316    /// Returns the builder's configured backoff slot size.
317    #[inline]
318    pub fn get_backoff_slot_size(&self) -> Duration {
319        self.0.cached.backoff_slot_size
320    }
321
322    /// Sets the maximum number of times a request attempt will be retried before giving up.
323    ///
324    /// Defaults to 4.
325    #[inline]
326    pub fn max_num_retries(mut self, max_num_retries: u32) -> Self {
327        self.0.cached.max_num_retries = max_num_retries;
328        self
329    }
330
331    /// Returns the builder's configured maximum number of retries.
332    #[inline]
333    pub fn get_max_num_retries(&self) -> u32 {
334        self.0.cached.max_num_retries
335    }
336
337    /// Sets the client's internal rate limiting behavior.
338    ///
339    /// Defaults to `ClientQos::Enabled`.
340    #[inline]
341    pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
342        self.0.cached.client_qos = client_qos;
343        self
344    }
345
346    /// Returns the builder's configured internal rate limiting behavior.
347    #[inline]
348    pub fn get_client_qos(&self) -> ClientQos {
349        self.0.cached.client_qos
350    }
351
352    /// Sets the client's behavior in response to a QoS error from the server.
353    ///
354    /// Defaults to `ServerQos::AutomaticRetry`.
355    #[inline]
356    pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
357        self.0.cached.server_qos = server_qos;
358        self
359    }
360
361    /// Returns the builder's configured server QoS behavior.
362    #[inline]
363    pub fn get_server_qos(&self) -> ServerQos {
364        self.0.cached.server_qos
365    }
366
367    /// Sets the client's behavior in response to a service error from the server.
368    ///
369    /// Defaults to `ServiceError::WrapInNewError`.
370    #[inline]
371    pub fn service_error(mut self, service_error: ServiceError) -> Self {
372        self.0.cached.service_error = service_error;
373        self
374    }
375
376    /// Returns the builder's configured service error handling behavior.
377    #[inline]
378    pub fn get_service_error(&self) -> ServiceError {
379        self.0.cached.service_error
380    }
381
382    /// Sets the client's behavior to determine if a request is idempotent or not.
383    ///
384    /// Only idempotent requests will be retried.
385    ///
386    /// Defaults to `Idempotency::ByMethod`.
387    #[inline]
388    pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
389        self.0.cached.idempotency = idempotency;
390        self
391    }
392
393    /// Returns the builder's configured idempotency handling behavior.
394    #[inline]
395    pub fn get_idempotency(&self) -> Idempotency {
396        self.0.cached.idempotency
397    }
398
399    /// Sets the client's strategy for selecting a node for a request.
400    ///
401    /// Defaults to `NodeSelectionStrategy::PinUntilError`.
402    #[inline]
403    pub fn node_selection_strategy(
404        mut self,
405        node_selection_strategy: NodeSelectionStrategy,
406    ) -> Self {
407        self.0.cached.node_selection_strategy = node_selection_strategy;
408        self
409    }
410
411    /// Returns the builder's configured node selection strategy.
412    #[inline]
413    pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
414        self.0.cached.node_selection_strategy
415    }
416
417    /// Sets the metric registry used to register client metrics.
418    ///
419    /// Defaults to no registry.
420    #[inline]
421    pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
422        self.0.uncached.metrics = Some(metrics);
423        self
424    }
425
426    /// Returns the builder's configured metric registry.
427    #[inline]
428    pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
429        self.0.uncached.metrics.as_ref()
430    }
431
432    /// Sets the host metrics registry used to track host performance.
433    ///
434    /// Defaults to no registry.
435    #[inline]
436    pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
437        self.0.uncached.host_metrics = Some(host_metrics);
438        self
439    }
440
441    /// Returns the builder's configured host metrics registry.
442    #[inline]
443    pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
444        self.0.uncached.host_metrics.as_ref()
445    }
446
447    /// Sets a seed used to initialize the client's random number generators.
448    ///
449    /// Several components of the client rely on entropy. If set, the client will use the seed to initialize its
450    /// internal random number generators such that clients created with the same configuration will produce the same
451    /// behavior.
452    ///
453    /// Defaults to no seed.
454    #[inline]
455    pub fn rng_seed(mut self, rng_seed: u64) -> Self {
456        self.0.cached.rng_seed = Some(rng_seed);
457        self
458    }
459
460    /// Returns the builder's configured RNG seed.
461    #[inline]
462    pub fn get_rng_seed(&self) -> Option<u64> {
463        self.0.cached.rng_seed
464    }
465
466    /// Returns the `Handle` to the tokio `Runtime` to be used by blocking clients.
467    ///
468    /// This has no effect on async clients.
469    ///
470    /// Defaults to a `conjure-runtime` internal `Runtime`.
471    #[inline]
472    pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
473        self.0.uncached.blocking_handle = Some(blocking_handle);
474        self
475    }
476
477    /// Returns the builder's configured blocking handle.
478    #[inline]
479    pub fn get_blocking_handle(&self) -> Option<&Handle> {
480        self.0.uncached.blocking_handle.as_ref()
481    }
482
483    /// Overrides the `hostIndex` field included in metrics.
484    #[inline]
485    pub fn override_host_index(mut self, override_host_index: usize) -> Self {
486        self.0.cached.override_host_index = Some(override_host_index);
487        self
488    }
489
490    /// Returns the builder's `hostIndex` override.
491    #[inline]
492    pub fn get_override_host_index(&self) -> Option<usize> {
493        self.0.cached.override_host_index
494    }
495
496    /// Sets the raw client builder.
497    ///
498    /// Defaults to `DefaultRawClientBuilder`.
499    #[inline]
500    pub fn raw_client_builder<U>(self, raw_client_builder: U) -> Builder<Complete<U>> {
501        Builder(Complete {
502            cached: self.0.cached,
503            uncached: UncachedConfig {
504                metrics: self.0.uncached.metrics,
505                host_metrics: self.0.uncached.host_metrics,
506                blocking_handle: self.0.uncached.blocking_handle,
507                raw_client_builder,
508            },
509        })
510    }
511
512    /// Returns the builder's configured raw client builder.
513    #[inline]
514    pub fn get_raw_client_builder(&self) -> &T {
515        &self.0.uncached.raw_client_builder
516    }
517
518    pub(crate) fn mesh_mode(&self) -> bool {
519        self.0
520            .cached
521            .uris
522            .iter()
523            .any(|uri| uri.scheme().starts_with(MESH_PREFIX))
524    }
525
526    pub(crate) fn postprocessed_uris(&self) -> Result<Cow<'_, [Url]>, Error> {
527        if self.mesh_mode() {
528            if self.0.cached.uris.len() != 1 {
529                return Err(Error::internal_safe("mesh mode expects exactly one URI")
530                    .with_safe_param("uris", &self.0.cached.uris));
531            }
532
533            let uri = self.0.cached.uris[0]
534                .as_str()
535                .strip_prefix(MESH_PREFIX)
536                .unwrap()
537                .parse()
538                .unwrap();
539
540            Ok(Cow::Owned(vec![uri]))
541        } else {
542            Ok(Cow::Borrowed(&self.0.cached.uris))
543        }
544    }
545}
546
547impl<T> Builder<Complete<T>>
548where
549    T: BuildRawClient,
550{
551    /// Creates a new `Client`.
552    pub fn build(&self) -> Result<Client<T::RawClient>, Error> {
553        let state = ClientState::new(self)?;
554        Ok(Client::new(
555            Arc::new(ArcSwap::new(Arc::new(Cached::uncached(state)))),
556            None,
557        ))
558    }
559
560    /// Creates a new `blocking::Client`.
561    pub fn build_blocking(&self) -> Result<blocking::Client<T::RawClient>, Error> {
562        self.build().map(|client| blocking::Client {
563            client,
564            handle: self.0.uncached.blocking_handle.clone(),
565        })
566    }
567}
568
569/// Specifies the beahavior of client-side sympathetic rate limiting.
570#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
571#[non_exhaustive]
572pub enum ClientQos {
573    /// Enable client side rate limiting.
574    ///
575    /// This is the default behavior.
576    Enabled,
577
578    /// Disables client-side rate limiting.
579    ///
580    /// This should only be used when there are known issues with the interaction between a service's rate limiting
581    /// implementation and the client's.
582    DangerousDisableSympatheticClientQos,
583}
584
585/// Specifies the behavior of a client in response to a `QoS` error from a server.
586///
587/// QoS errors have status codes 429 or 503.
588#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
589#[non_exhaustive]
590pub enum ServerQos {
591    /// The client will automatically retry the request when possible in response to a QoS error.
592    ///
593    /// This is the default behavior.
594    AutomaticRetry,
595
596    /// The client will transparently propagate the QoS error without retrying.
597    ///
598    /// This is designed for use when an upstream service has better context on how to handle a QoS error. Propagating
599    /// the error upstream to that service without retrying allows it to handle retry logic internally.
600    Propagate429And503ToCaller,
601}
602
603/// Specifies the behavior of the client in response to a service error from a server.
604///
605/// Service errors are encoded as responses with a 4xx or 5xx response code and a body containing a `SerializableError`.
606#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
607#[non_exhaustive]
608pub enum ServiceError {
609    /// The service error will be propagated as a new internal service error.
610    ///
611    /// The error's cause will contain the information about the received service error, but the error constructed by
612    /// the client will have a different error instance ID, type, etc.
613    ///
614    /// This is the default behavior.
615    WrapInNewError,
616
617    /// The service error will be transparently propagated without change.
618    ///
619    /// This is designed for use when proxying a request to another node, commonly of the same service. By preserving
620    /// the original error's instance ID, type, etc, the upstream service will be able to process the error properly.
621    PropagateToCaller,
622}
623
624/// Specifies the manner in which the client decides if a request is idempotent or not.
625#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
626#[non_exhaustive]
627pub enum Idempotency {
628    /// All requests are assumed to be idempotent.
629    Always,
630
631    /// Only requests with HTTP methods defined as idempotent (GET, HEAD, OPTIONS, TRACE, PUT, and DELETE) are assumed
632    /// to be idempotent.
633    ///
634    /// This is the default behavior.
635    ByMethod,
636
637    /// No requests are assumed to be idempotent.
638    Never,
639}
640
641/// Specifies the strategy used to select a node of a service to use for a request attempt.
642#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
643#[non_exhaustive]
644pub enum NodeSelectionStrategy {
645    /// Pin to a single host as long as it continues to successfully respond to requests.
646    ///
647    /// If the pinned node fails to successfully respond, the client will rotate through the other nodes until it finds
648    /// one that can successfully respond and then pin to that new node. The pinned node will also be randomly rotated
649    /// periodically to help spread load across the cluster.
650    ///
651    /// This is the default behavior.
652    PinUntilError,
653
654    /// Like `PinUntilError` except that the pinned node is never randomly shuffled.
655    PinUntilErrorWithoutReshuffle,
656
657    /// For each new request, select the "next" node (in some unspecified order).
658    Balanced,
659}