conjure_runtime/
client_factory.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! The client factory.
15use crate::builder::{CachedConfig, UncachedConfig};
16use crate::config::{ServiceConfig, ServicesConfig};
17use crate::raw::{DefaultRawClient, DefaultRawClientBuilder};
18use crate::weak_cache::{Cached, WeakCache};
19use crate::{blocking, Builder, ClientState, Host, PerHostClients};
20use crate::{
21    Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos,
22    ServiceError, UserAgent,
23};
24use arc_swap::ArcSwap;
25use conjure_error::Error;
26use conjure_http::client::{AsyncService, Service};
27use conjure_runtime_config::service_config;
28use refreshable::{RefreshHandle, Refreshable};
29use std::borrow::Borrow;
30use std::collections::HashMap;
31use std::sync::Arc;
32use tokio::runtime::Handle;
33use witchcraft_log::warn;
34use witchcraft_metrics::MetricRegistry;
35
36const STATE_CACHE_CAPACITY: usize = 10_000;
37
38/// A factory type which can create clients that will live-reload in response to configuration updates.
39#[derive(Clone)]
40pub struct ClientFactory<T = Complete>(T);
41
42/// The config builder stage.
43pub struct ConfigStage(());
44
45/// The user agent builder stage.
46pub struct UserAgentStage {
47    config: Arc<Refreshable<ServicesConfig, Error>>,
48}
49
50#[derive(Clone)]
51struct CacheManager {
52    uncached_inner: UncachedConfig<DefaultRawClientBuilder>,
53    cache: WeakCache<CachedConfig, ClientState<DefaultRawClient>>,
54}
55
56impl CacheManager {
57    fn uncached(&self) -> &UncachedConfig<DefaultRawClientBuilder> {
58        &self.uncached_inner
59    }
60
61    fn uncached_mut(&mut self) -> &mut UncachedConfig<DefaultRawClientBuilder> {
62        self.cache = WeakCache::new(STATE_CACHE_CAPACITY);
63        &mut self.uncached_inner
64    }
65}
66
67/// The complete builder stage.
68#[derive(Clone)]
69pub struct Complete {
70    config: Arc<Refreshable<ServicesConfig, Error>>,
71    user_agent: UserAgent,
72    client_qos: ClientQos,
73    server_qos: ServerQos,
74    service_error: ServiceError,
75    idempotency: Idempotency,
76    node_selection_strategy: NodeSelectionStrategy,
77    cache_manager: CacheManager,
78}
79
80impl Default for ClientFactory<ConfigStage> {
81    #[inline]
82    fn default() -> Self {
83        ClientFactory::builder()
84    }
85}
86
87impl ClientFactory<ConfigStage> {
88    /// Creates a new builder to construct a client factory.
89    #[inline]
90    pub fn builder() -> Self {
91        ClientFactory(ConfigStage(()))
92    }
93
94    /// Sets the refreshable configuration used for service clients.
95    #[inline]
96    pub fn config(
97        self,
98        config: Refreshable<ServicesConfig, Error>,
99    ) -> ClientFactory<UserAgentStage> {
100        ClientFactory(UserAgentStage {
101            config: Arc::new(config),
102        })
103    }
104}
105
106impl ClientFactory<UserAgentStage> {
107    /// Sets the user agent sent by clients.
108    #[inline]
109    pub fn user_agent(self, user_agent: UserAgent) -> ClientFactory {
110        ClientFactory(Complete {
111            config: self.0.config,
112            user_agent,
113            client_qos: ClientQos::Enabled,
114            server_qos: ServerQos::AutomaticRetry,
115            service_error: ServiceError::WrapInNewError,
116            idempotency: Idempotency::ByMethod,
117            node_selection_strategy: NodeSelectionStrategy::PinUntilError,
118            cache_manager: CacheManager {
119                uncached_inner: UncachedConfig {
120                    metrics: None,
121                    host_metrics: None,
122                    blocking_handle: None,
123                    raw_client_builder: DefaultRawClientBuilder,
124                },
125                cache: WeakCache::new(STATE_CACHE_CAPACITY),
126            },
127        })
128    }
129}
130
131impl ClientFactory {
132    /// Sets the user agent sent by clients.
133    #[inline]
134    pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
135        self.0.user_agent = user_agent;
136        self
137    }
138
139    /// Returns the configured user agent.
140    #[inline]
141    pub fn get_user_agent(&self) -> &UserAgent {
142        &self.0.user_agent
143    }
144
145    /// Sets clients' rate limiting behavior.
146    ///
147    /// Defaults to `ClientQos::Enabled`.
148    #[inline]
149    pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
150        self.0.client_qos = client_qos;
151        self
152    }
153
154    /// Returns the configured rate limiting behavior
155    #[inline]
156    pub fn get_client_qos(&self) -> ClientQos {
157        self.0.client_qos
158    }
159
160    /// Sets clients' behavior in response to a QoS error from the server.
161    ///
162    /// Defaults to `ServerQos::AutomaticRetry`.
163    #[inline]
164    pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
165        self.0.server_qos = server_qos;
166        self
167    }
168
169    /// Returns the configured QoS behavior.
170    #[inline]
171    pub fn get_server_qos(&self) -> ServerQos {
172        self.0.server_qos
173    }
174
175    /// Sets clients' behavior in response to a service error from the server.
176    ///
177    /// Defaults to `ServiceError::WrapInNewError`.
178    #[inline]
179    pub fn service_error(mut self, service_error: ServiceError) -> Self {
180        self.0.service_error = service_error;
181        self
182    }
183
184    /// Returns the configured service error behavior.
185    #[inline]
186    pub fn get_service_error(&self) -> ServiceError {
187        self.0.service_error
188    }
189
190    /// Sets clients' behavior to determine if a request is idempotent or not.
191    ///
192    /// Only idempotent requests will be retried.
193    ///
194    /// Defaults to `Idempotency::ByMethod`.
195    #[inline]
196    pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
197        self.0.idempotency = idempotency;
198        self
199    }
200
201    /// Returns the configured idempotency behavior.
202    #[inline]
203    pub fn get_idempotency(&self) -> Idempotency {
204        self.0.idempotency
205    }
206
207    /// Sets the clients' strategy for selecting a node for a request.
208    ///
209    /// Defaults to `NodeSelectionStrategy::PinUntilError`.
210    #[inline]
211    pub fn node_selection_strategy(
212        mut self,
213        node_selection_strategy: NodeSelectionStrategy,
214    ) -> Self {
215        self.0.node_selection_strategy = node_selection_strategy;
216        self
217    }
218
219    /// Returns the configured node selection strategy.
220    #[inline]
221    pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
222        self.0.node_selection_strategy
223    }
224
225    /// Sets the metric registry used to register client metrics.
226    ///
227    /// Defaults to no registry.
228    #[inline]
229    pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
230        self.0.cache_manager.uncached_mut().metrics = Some(metrics);
231        self
232    }
233
234    /// Returns the configured metrics registry.
235    #[inline]
236    pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
237        self.0.cache_manager.uncached().metrics.as_ref()
238    }
239
240    /// Sets the host metrics registry used to track host performance.
241    ///
242    /// Defaults to no registry.
243    #[inline]
244    pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
245        self.0.cache_manager.uncached_mut().host_metrics = Some(host_metrics);
246        self
247    }
248
249    /// Returns the configured host metrics registry.
250    #[inline]
251    pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
252        self.0.cache_manager.uncached().host_metrics.as_ref()
253    }
254
255    /// Returns the `Handle` to the tokio `Runtime` to be used by blocking clients.
256    ///
257    /// This has no effect on async clients.
258    ///
259    /// Defaults to a `conjure-runtime` internal `Runtime`.
260    #[inline]
261    pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
262        self.0.cache_manager.uncached_mut().blocking_handle = Some(blocking_handle);
263        self
264    }
265
266    /// Returns the configured blocking handle.
267    #[inline]
268    pub fn get_blocking_handle(&self) -> Option<&Handle> {
269        self.0.cache_manager.uncached().blocking_handle.as_ref()
270    }
271
272    fn state_builder(&self, service: &str) -> StateBuilder {
273        StateBuilder {
274            service: service.to_string(),
275            user_agent: self.0.user_agent.clone(),
276            client_qos: self.0.client_qos,
277            server_qos: self.0.server_qos,
278            service_error: self.0.service_error,
279            idempotency: self.0.idempotency,
280            node_selection_strategy: self.0.node_selection_strategy,
281            cache_manager: self.0.cache_manager.clone(),
282        }
283    }
284
285    /// Creates a new client for the specified service.
286    ///
287    /// The client's configuration will automatically refresh to track changes in the factory's [`ServicesConfig`].
288    ///
289    /// If no configuration is present for the specified service in the [`ServicesConfig`], the client will
290    /// immediately return an error for all requests.
291    ///
292    /// The method can return any type implementing the `conjure-http` [`AsyncService`] trait. This notably includes all
293    /// Conjure-generated client types as well as the `conjure-runtime` [`Client`] itself.
294    pub fn client<T>(&self, service: &str) -> Result<T, Error>
295    where
296        T: AsyncService<Client>,
297    {
298        self.client_inner(service).map(T::new)
299    }
300
301    fn client_inner(&self, service: &str) -> Result<Client, Error> {
302        let service_config = self.0.config.map({
303            let service = service.to_string();
304            move |c| c.merged_service(&service).unwrap_or_default()
305        });
306        let state_builder = self.state_builder(service);
307        Self::raw_client(state_builder, service_config, None)
308    }
309
310    fn raw_client<T>(
311        state_builder: T,
312        service_config: Refreshable<ServiceConfig, Error>,
313        override_host_index: Option<usize>,
314    ) -> Result<Client, Error>
315    where
316        T: Borrow<StateBuilder> + 'static + Sync + Send,
317    {
318        let state = state_builder
319            .borrow()
320            .build(&service_config.get(), override_host_index)?;
321        let state = Arc::new(ArcSwap::new(state));
322
323        let subscription = service_config.try_subscribe({
324            let state = state.clone();
325            move |config| {
326                let new_state = state_builder.borrow().build(config, override_host_index)?;
327                state.store(new_state);
328                Ok(())
329            }
330        })?;
331
332        Ok(Client::new(state, Some(subscription)))
333    }
334
335    /// Creates a new blocking client for the specified service.
336    ///
337    /// The client's configuration will automatically refresh to track changes in the factory's [`ServicesConfig`].
338    ///
339    /// If no configuration is present for the specified service in the [`ServicesConfig`], the client will
340    /// immediately return an error for all requests.
341    ///
342    /// The method can return any type implementing the `conjure-http` [`Service`] trait. This notably includes all
343    /// Conjure-generated client types as well as the `conjure-runtime` [`blocking::Client`] itself.
344    pub fn blocking_client<T>(&self, service: &str) -> Result<T, Error>
345    where
346        T: Service<blocking::Client>,
347    {
348        self.blocking_client_inner(service).map(T::new)
349    }
350
351    fn blocking_client_inner(&self, service: &str) -> Result<blocking::Client, Error> {
352        self.client_inner(service).map(|client| blocking::Client {
353            client,
354            handle: self.0.cache_manager.uncached().blocking_handle.clone(),
355        })
356    }
357
358    /// Creates a refreshable collection of clients, each corresponding to a separate replica of the
359    /// service.
360    ///
361    /// # Note
362    ///
363    /// The client type `T` is assumed to be stateless - each instance will be recreated on every
364    /// refresh.
365    pub fn per_host_clients<T>(
366        &self,
367        service: &str,
368    ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
369    where
370        T: AsyncService<Client> + 'static + Sync + Send,
371    {
372        self.per_host_clients_inner(service, T::new)
373    }
374
375    /// Creates a refreshable collection of blocking clients, each corresponding to a separate
376    /// replica of the service.
377    ///
378    /// # Note
379    ///
380    /// The client type `T` is assumed to be stateless - each instance will be recreated on every
381    /// refresh.
382    pub fn blocking_per_host_clients<T>(
383        &self,
384        service: &str,
385    ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
386    where
387        T: Service<blocking::Client> + 'static + Sync + Send,
388    {
389        self.per_host_clients_inner(service, {
390            let handle = self.0.cache_manager.uncached().blocking_handle.clone();
391            move |client| {
392                T::new(blocking::Client {
393                    client,
394                    handle: handle.clone(),
395                })
396            }
397        })
398    }
399
400    fn per_host_clients_inner<T>(
401        &self,
402        service: &str,
403        make_client: impl Fn(Client) -> T + 'static + Sync + Send,
404    ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
405    where
406        T: 'static + Sync + Send,
407    {
408        let state_builder = Arc::new(self.state_builder(service));
409
410        self.0
411            .config
412            .map({
413                let service = service.to_string();
414                move |c| c.merged_service(&service).unwrap_or_default()
415            })
416            .try_map({
417                let mut client_handles = HashMap::<Host, ClientHandle>::new();
418                move |c| {
419                    let mut new_client_handles = HashMap::new();
420                    let mut clients = HashMap::new();
421                    let mut errors = vec![];
422
423                    for (i, uri) in c.uris().iter().enumerate() {
424                        let host = Host { uri: uri.clone() };
425                        let host_config = service_config::Builder::from(c.clone())
426                            .uris([uri.clone()])
427                            .build();
428
429                        let client_handle = match client_handles.remove(&host) {
430                            Some(mut client_handle) => {
431                                if let Err(es) = client_handle.handle.refresh(host_config) {
432                                    errors.extend(es);
433                                }
434                                client_handle
435                            }
436                            None => {
437                                let (host_config, handle) = Refreshable::new(host_config);
438                                let client = match Self::raw_client(
439                                    state_builder.clone(),
440                                    host_config,
441                                    Some(i),
442                                ) {
443                                    Ok(state) => state,
444                                    Err(e) => {
445                                        errors.push(e);
446                                        continue;
447                                    }
448                                };
449                                ClientHandle { client, handle }
450                            }
451                        };
452
453                        clients.insert(host.clone(), make_client(client_handle.client.clone()));
454                        new_client_handles.insert(host, client_handle);
455                    }
456
457                    client_handles = new_client_handles;
458                    match errors.pop() {
459                        Some(e) => {
460                            for e2 in errors {
461                                warn!("error reloading per-host clients", error: e2);
462                            }
463                            Err(e)
464                        }
465                        None => Ok(PerHostClients { clients }),
466                    }
467                }
468            })
469    }
470}
471
472struct ClientHandle {
473    client: Client,
474    handle: RefreshHandle<ServiceConfig, Error>,
475}
476
477struct StateBuilder {
478    service: String,
479    user_agent: UserAgent,
480    client_qos: ClientQos,
481    server_qos: ServerQos,
482    service_error: ServiceError,
483    idempotency: Idempotency,
484    node_selection_strategy: NodeSelectionStrategy,
485    cache_manager: CacheManager,
486}
487
488impl StateBuilder {
489    fn build(
490        &self,
491        config: &ServiceConfig,
492        override_host_index: Option<usize>,
493    ) -> Result<Arc<Cached<CachedConfig, ClientState<DefaultRawClient>>>, Error> {
494        let mut builder = Client::builder()
495            .service(&self.service)
496            .user_agent(self.user_agent.clone())
497            .from_config(config)
498            .client_qos(self.client_qos)
499            .server_qos(self.server_qos)
500            .service_error(self.service_error)
501            .idempotency(self.idempotency)
502            .node_selection_strategy(self.node_selection_strategy);
503
504        if let Some(metrics) = self.cache_manager.uncached().metrics.clone() {
505            builder = builder.metrics(metrics);
506        }
507
508        if let Some(host_metrics) = self.cache_manager.uncached().host_metrics.clone() {
509            builder = builder.host_metrics(host_metrics);
510        }
511
512        if let Some(override_host_index) = override_host_index {
513            builder = builder.override_host_index(override_host_index);
514        }
515
516        self.cache_manager
517            .cache
518            .get(&builder, Builder::cached_config, ClientState::new)
519    }
520}