conjure_runtime/
builder.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! The client builder.
15#[cfg(not(target_arch = "wasm32"))]
16use crate::blocking;
17use crate::client::ClientState;
18use crate::config::{ProxyConfig, SecurityConfig, ServiceConfig};
19use crate::weak_cache::Cached;
20use crate::{Client, HostMetricsRegistry, UserAgent};
21use arc_swap::ArcSwap;
22use conjure_error::Error;
23use conjure_http::client::ConjureRuntime;
24use std::borrow::Cow;
25use std::sync::Arc;
26use std::time::Duration;
27#[cfg(not(target_arch = "wasm32"))]
28use tokio::runtime::Handle;
29use url::Url;
30use witchcraft_metrics::MetricRegistry;
31
32const MESH_PREFIX: &str = "mesh-";
33
34/// A builder to construct [`Client`]s and [`blocking::Client`]s.
35pub struct Builder<T = Complete>(T);
36
37/// The service builder stage.
38pub struct ServiceStage(());
39
40/// The user agent builder stage.
41pub struct UserAgentStage {
42    service: String,
43}
44
45#[derive(Clone, PartialEq, Eq, Hash)]
46pub(crate) struct CachedConfig {
47    service: String,
48    user_agent: UserAgent,
49    uris: Vec<Url>,
50    security: SecurityConfig,
51    proxy: ProxyConfig,
52    connect_timeout: Duration,
53    read_timeout: Duration,
54    write_timeout: Duration,
55    backoff_slot_size: Duration,
56    max_num_retries: u32,
57    client_qos: ClientQos,
58    server_qos: ServerQos,
59    service_error: ServiceError,
60    idempotency: Idempotency,
61    node_selection_strategy: NodeSelectionStrategy,
62    override_host_index: Option<usize>,
63}
64
65#[derive(Clone)]
66pub(crate) struct UncachedConfig {
67    pub(crate) metrics: Option<Arc<MetricRegistry>>,
68    pub(crate) host_metrics: Option<Arc<HostMetricsRegistry>>,
69    #[cfg(not(target_arch = "wasm32"))]
70    pub(crate) blocking_handle: Option<Handle>,
71    pub(crate) conjure_runtime: Arc<ConjureRuntime>,
72}
73
74/// The complete builder stage.
75pub struct Complete {
76    cached: CachedConfig,
77    uncached: UncachedConfig,
78}
79
80impl Default for Builder<ServiceStage> {
81    #[inline]
82    fn default() -> Self {
83        Builder::new()
84    }
85}
86
87impl Builder<ServiceStage> {
88    /// Creates a new builder with default settings.
89    #[inline]
90    pub fn new() -> Self {
91        Builder(ServiceStage(()))
92    }
93
94    /// Sets the name of the service this client will communicate with.
95    ///
96    /// This is used in logging and metrics to allow differentiation between different clients.
97    #[inline]
98    pub fn service(self, service: &str) -> Builder<UserAgentStage> {
99        Builder(UserAgentStage {
100            service: service.to_string(),
101        })
102    }
103}
104
105impl Builder<UserAgentStage> {
106    /// Sets the user agent sent by this client.
107    #[inline]
108    pub fn user_agent(self, user_agent: UserAgent) -> Builder {
109        Builder(Complete {
110            cached: CachedConfig {
111                service: self.0.service,
112                user_agent,
113                uris: vec![],
114                security: SecurityConfig::builder().build(),
115                proxy: ProxyConfig::Direct,
116                connect_timeout: Duration::from_secs(10),
117                read_timeout: Duration::from_secs(5 * 60),
118                write_timeout: Duration::from_secs(5 * 60),
119                backoff_slot_size: Duration::from_millis(250),
120                max_num_retries: 4,
121                client_qos: ClientQos::Enabled,
122                server_qos: ServerQos::AutomaticRetry,
123                service_error: ServiceError::WrapInNewError,
124                idempotency: Idempotency::ByMethod,
125                node_selection_strategy: NodeSelectionStrategy::PinUntilError,
126                override_host_index: None,
127            },
128            uncached: UncachedConfig {
129                metrics: None,
130                host_metrics: None,
131                #[cfg(not(target_arch = "wasm32"))]
132                blocking_handle: None,
133                conjure_runtime: Arc::new(ConjureRuntime::new()),
134            },
135        })
136    }
137}
138
139#[cfg(test)]
140impl Builder {
141    pub(crate) fn for_test() -> Self {
142        use crate::Agent;
143
144        Builder::new()
145            .service("test")
146            .user_agent(UserAgent::new(Agent::new("test", "0.0.0")))
147    }
148}
149
150impl Builder<Complete> {
151    pub(crate) fn cached_config(&self) -> &CachedConfig {
152        &self.0.cached
153    }
154
155    /// Applies configuration settings from a `ServiceConfig` to the builder.
156    #[inline]
157    pub fn from_config(mut self, config: &ServiceConfig) -> Self {
158        self = self.uris(config.uris().to_vec());
159
160        if let Some(security) = config.security() {
161            self = self.security(security.clone());
162        }
163
164        if let Some(proxy) = config.proxy() {
165            self = self.proxy(proxy.clone());
166        }
167
168        if let Some(connect_timeout) = config.connect_timeout() {
169            self = self.connect_timeout(connect_timeout);
170        }
171
172        if let Some(read_timeout) = config.read_timeout() {
173            self = self.read_timeout(read_timeout);
174        }
175
176        if let Some(write_timeout) = config.write_timeout() {
177            self = self.write_timeout(write_timeout);
178        }
179
180        if let Some(backoff_slot_size) = config.backoff_slot_size() {
181            self = self.backoff_slot_size(backoff_slot_size);
182        }
183
184        if let Some(max_num_retries) = config.max_num_retries() {
185            self = self.max_num_retries(max_num_retries);
186        }
187
188        self
189    }
190
191    /// Returns the builder's configured service name.
192    #[inline]
193    pub fn get_service(&self) -> &str {
194        &self.0.cached.service
195    }
196
197    /// Returns the builder's configured user agent.
198    #[inline]
199    pub fn get_user_agent(&self) -> &UserAgent {
200        &self.0.cached.user_agent
201    }
202
203    /// Appends a URI to the URIs list.
204    ///
205    /// Defaults to an empty list.
206    #[inline]
207    pub fn uri(mut self, uri: Url) -> Self {
208        self.0.cached.uris.push(uri);
209        self
210    }
211
212    /// Sets the URIs list.
213    ///
214    /// Defaults to an empty list.
215    #[inline]
216    pub fn uris(mut self, uris: Vec<Url>) -> Self {
217        self.0.cached.uris = uris;
218        self
219    }
220
221    /// Returns the builder's configured URIs list.
222    #[inline]
223    pub fn get_uris(&self) -> &[Url] {
224        &self.0.cached.uris
225    }
226
227    /// Sets the security configuration.
228    ///
229    /// Defaults to an empty configuration.
230    #[inline]
231    pub fn security(mut self, security: SecurityConfig) -> Self {
232        self.0.cached.security = security;
233        self
234    }
235
236    /// Returns the builder's configured security configuration.
237    #[inline]
238    pub fn get_security(&self) -> &SecurityConfig {
239        &self.0.cached.security
240    }
241
242    /// Sets the proxy configuration.
243    ///
244    /// Defaults to `ProxyConfig::Direct` (i.e. no proxy).
245    #[inline]
246    pub fn proxy(mut self, proxy: ProxyConfig) -> Self {
247        self.0.cached.proxy = proxy;
248        self
249    }
250
251    /// Returns the builder's configured proxy configuration.
252    #[inline]
253    pub fn get_proxy(&self) -> &ProxyConfig {
254        &self.0.cached.proxy
255    }
256
257    /// Sets the connect timeout.
258    ///
259    /// Defaults to 10 seconds.
260    #[inline]
261    pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
262        self.0.cached.connect_timeout = connect_timeout;
263        self
264    }
265
266    /// Returns the builder's configured connect timeout.
267    #[inline]
268    pub fn get_connect_timeout(&self) -> Duration {
269        self.0.cached.connect_timeout
270    }
271
272    /// Sets the read timeout.
273    ///
274    /// This timeout applies to socket-level read attempts.
275    ///
276    /// Defaults to 5 minutes.
277    #[inline]
278    pub fn read_timeout(mut self, read_timeout: Duration) -> Self {
279        self.0.cached.read_timeout = read_timeout;
280        self
281    }
282
283    /// Returns the builder's configured read timeout.
284    #[inline]
285    pub fn get_read_timeout(&self) -> Duration {
286        self.0.cached.read_timeout
287    }
288
289    /// Sets the write timeout.
290    ///
291    /// This timeout applies to socket-level write attempts.
292    ///
293    /// Defaults to 5 minutes.
294    #[inline]
295    pub fn write_timeout(mut self, write_timeout: Duration) -> Self {
296        self.0.cached.write_timeout = write_timeout;
297        self
298    }
299
300    /// Returns the builder's configured write timeout.
301    #[inline]
302    pub fn get_write_timeout(&self) -> Duration {
303        self.0.cached.write_timeout
304    }
305
306    /// Sets the backoff slot size.
307    ///
308    /// This is the upper bound on the initial delay before retrying a request. It grows exponentially as additional
309    /// attempts are made for a given request.
310    ///
311    /// Defaults to 250 milliseconds.
312    #[inline]
313    pub fn backoff_slot_size(mut self, backoff_slot_size: Duration) -> Self {
314        self.0.cached.backoff_slot_size = backoff_slot_size;
315        self
316    }
317
318    /// Returns the builder's configured backoff slot size.
319    #[inline]
320    pub fn get_backoff_slot_size(&self) -> Duration {
321        self.0.cached.backoff_slot_size
322    }
323
324    /// Sets the maximum number of times a request attempt will be retried before giving up.
325    ///
326    /// Defaults to 4.
327    #[inline]
328    pub fn max_num_retries(mut self, max_num_retries: u32) -> Self {
329        self.0.cached.max_num_retries = max_num_retries;
330        self
331    }
332
333    /// Returns the builder's configured maximum number of retries.
334    #[inline]
335    pub fn get_max_num_retries(&self) -> u32 {
336        self.0.cached.max_num_retries
337    }
338
339    /// Sets the client's internal rate limiting behavior.
340    ///
341    /// Defaults to `ClientQos::Enabled`.
342    #[inline]
343    pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
344        self.0.cached.client_qos = client_qos;
345        self
346    }
347
348    /// Returns the builder's configured internal rate limiting behavior.
349    #[inline]
350    pub fn get_client_qos(&self) -> ClientQos {
351        self.0.cached.client_qos
352    }
353
354    /// Sets the client's behavior in response to a QoS error from the server.
355    ///
356    /// Defaults to `ServerQos::AutomaticRetry`.
357    #[inline]
358    pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
359        self.0.cached.server_qos = server_qos;
360        self
361    }
362
363    /// Returns the builder's configured server QoS behavior.
364    #[inline]
365    pub fn get_server_qos(&self) -> ServerQos {
366        self.0.cached.server_qos
367    }
368
369    /// Sets the client's behavior in response to a service error from the server.
370    ///
371    /// Defaults to `ServiceError::WrapInNewError`.
372    #[inline]
373    pub fn service_error(mut self, service_error: ServiceError) -> Self {
374        self.0.cached.service_error = service_error;
375        self
376    }
377
378    /// Returns the builder's configured service error handling behavior.
379    #[inline]
380    pub fn get_service_error(&self) -> ServiceError {
381        self.0.cached.service_error
382    }
383
384    /// Sets the client's behavior to determine if a request is idempotent or not.
385    ///
386    /// Only idempotent requests will be retried.
387    ///
388    /// Defaults to `Idempotency::ByMethod`.
389    #[inline]
390    pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
391        self.0.cached.idempotency = idempotency;
392        self
393    }
394
395    /// Returns the builder's configured idempotency handling behavior.
396    #[inline]
397    pub fn get_idempotency(&self) -> Idempotency {
398        self.0.cached.idempotency
399    }
400
401    /// Sets the client's strategy for selecting a node for a request.
402    ///
403    /// Defaults to `NodeSelectionStrategy::PinUntilError`.
404    #[inline]
405    pub fn node_selection_strategy(
406        mut self,
407        node_selection_strategy: NodeSelectionStrategy,
408    ) -> Self {
409        self.0.cached.node_selection_strategy = node_selection_strategy;
410        self
411    }
412
413    /// Returns the builder's configured node selection strategy.
414    #[inline]
415    pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
416        self.0.cached.node_selection_strategy
417    }
418
419    /// Sets the metric registry used to register client metrics.
420    ///
421    /// Defaults to no registry.
422    #[inline]
423    pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
424        self.0.uncached.metrics = Some(metrics);
425        self
426    }
427
428    /// Returns the builder's configured metric registry.
429    #[inline]
430    pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
431        self.0.uncached.metrics.as_ref()
432    }
433
434    /// Sets the host metrics registry used to track host performance.
435    ///
436    /// Defaults to no registry.
437    #[inline]
438    pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
439        self.0.uncached.host_metrics = Some(host_metrics);
440        self
441    }
442
443    /// Returns the builder's configured host metrics registry.
444    #[inline]
445    pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
446        self.0.uncached.host_metrics.as_ref()
447    }
448
449    /// Overrides the `hostIndex` field included in metrics.
450    #[inline]
451    pub fn override_host_index(mut self, override_host_index: usize) -> Self {
452        self.0.cached.override_host_index = Some(override_host_index);
453        self
454    }
455
456    /// Returns the builder's `hostIndex` override.
457    #[inline]
458    pub fn get_override_host_index(&self) -> Option<usize> {
459        self.0.cached.override_host_index
460    }
461
462    pub(crate) fn mesh_mode(&self) -> bool {
463        self.0
464            .cached
465            .uris
466            .iter()
467            .any(|uri| uri.scheme().starts_with(MESH_PREFIX))
468    }
469
470    pub(crate) fn postprocessed_uris(&self) -> Result<Cow<'_, [Url]>, Error> {
471        if self.mesh_mode() {
472            if self.0.cached.uris.len() != 1 {
473                return Err(Error::internal_safe("mesh mode expects exactly one URI")
474                    .with_safe_param("uris", &self.0.cached.uris));
475            }
476
477            let uri = self.0.cached.uris[0]
478                .as_str()
479                .strip_prefix(MESH_PREFIX)
480                .unwrap()
481                .parse()
482                .unwrap();
483
484            Ok(Cow::Owned(vec![uri]))
485        } else {
486            Ok(Cow::Borrowed(&self.0.cached.uris))
487        }
488    }
489
490    /// Creates a new `Client`.
491    pub fn build(&self) -> Result<Client, Error> {
492        let state = ClientState::new(self)?;
493        Ok(Client::new(
494            Arc::new(ArcSwap::new(Arc::new(Cached::uncached(state)))),
495            None,
496        ))
497    }
498}
499
500#[cfg(not(target_arch = "wasm32"))]
501impl Builder<Complete> {
502    /// Returns the `Handle` to the tokio `Runtime` to be used by blocking clients.
503    ///
504    /// This has no effect on async clients.
505    ///
506    /// Defaults to a `conjure-runtime` internal `Runtime`.
507    #[inline]
508    pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
509        self.0.uncached.blocking_handle = Some(blocking_handle);
510        self
511    }
512
513    /// Returns the builder's configured blocking handle.
514    #[inline]
515    pub fn get_blocking_handle(&self) -> Option<&Handle> {
516        self.0.uncached.blocking_handle.as_ref()
517    }
518
519    /// Creates a new `blocking::Client`.
520    pub fn build_blocking(&self) -> Result<blocking::Client, Error> {
521        self.build().map(|client| blocking::Client {
522            client,
523            handle: self.0.uncached.blocking_handle.clone(),
524        })
525    }
526}
527
528/// Specifies the beahavior of client-side sympathetic rate limiting.
529#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
530#[non_exhaustive]
531pub enum ClientQos {
532    /// Enable client side rate limiting.
533    ///
534    /// This is the default behavior.
535    Enabled,
536
537    /// Disables client-side rate limiting.
538    ///
539    /// This should only be used when there are known issues with the interaction between a service's rate limiting
540    /// implementation and the client's.
541    DangerousDisableSympatheticClientQos,
542}
543
544/// Specifies the behavior of a client in response to a `QoS` error from a server.
545///
546/// QoS errors have status codes 429 or 503.
547#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
548#[non_exhaustive]
549pub enum ServerQos {
550    /// The client will automatically retry the request when possible in response to a QoS error.
551    ///
552    /// This is the default behavior.
553    AutomaticRetry,
554
555    /// The client will transparently propagate the QoS error without retrying.
556    ///
557    /// This is designed for use when an upstream service has better context on how to handle a QoS error. Propagating
558    /// the error upstream to that service without retrying allows it to handle retry logic internally.
559    Propagate429And503ToCaller,
560}
561
562/// Specifies the behavior of the client in response to a service error from a server.
563///
564/// Service errors are encoded as responses with a 4xx or 5xx response code and a body containing a `SerializableError`.
565#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
566#[non_exhaustive]
567pub enum ServiceError {
568    /// The service error will be propagated as a new internal service error.
569    ///
570    /// The error's cause will contain the information about the received service error, but the error constructed by
571    /// the client will have a different error instance ID, type, etc.
572    ///
573    /// This is the default behavior.
574    WrapInNewError,
575
576    /// The service error will be transparently propagated without change.
577    ///
578    /// This is designed for use when proxying a request to another node, commonly of the same service. By preserving
579    /// the original error's instance ID, type, etc, the upstream service will be able to process the error properly.
580    PropagateToCaller,
581}
582
583/// Specifies the manner in which the client decides if a request is idempotent or not.
584#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
585#[non_exhaustive]
586pub enum Idempotency {
587    /// All requests are assumed to be idempotent.
588    Always,
589
590    /// Only requests with HTTP methods defined as idempotent (GET, HEAD, OPTIONS, TRACE, PUT, and DELETE) are assumed
591    /// to be idempotent.
592    ///
593    /// This is the default behavior.
594    ByMethod,
595
596    /// No requests are assumed to be idempotent.
597    Never,
598}
599
600/// Specifies the strategy used to select a node of a service to use for a request attempt.
601#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
602#[non_exhaustive]
603pub enum NodeSelectionStrategy {
604    /// Pin to a single host as long as it continues to successfully respond to requests.
605    ///
606    /// If the pinned node fails to successfully respond, the client will rotate through the other nodes until it finds
607    /// one that can successfully respond and then pin to that new node. The pinned node will also be randomly rotated
608    /// periodically to help spread load across the cluster.
609    ///
610    /// This is the default behavior.
611    PinUntilError,
612
613    /// Like `PinUntilError` except that the pinned node is never randomly shuffled.
614    PinUntilErrorWithoutReshuffle,
615
616    /// For each new request, select the "next" node (in some unspecified order).
617    Balanced,
618}