conjure_runtime/
client_factory.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! The client factory.
15#[cfg(not(target_arch = "wasm32"))]
16use crate::blocking;
17use crate::builder::{CachedConfig, UncachedConfig};
18use crate::config::{ServiceConfig, ServicesConfig};
19use crate::weak_cache::{Cached, WeakCache};
20use crate::{Builder, ClientState, Host, PerHostClients};
21use crate::{
22    Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos,
23    ServiceError, UserAgent,
24};
25use arc_swap::ArcSwap;
26use conjure_error::Error;
27#[cfg(not(target_arch = "wasm32"))]
28use conjure_http::client::Service;
29use conjure_http::client::{AsyncService, ConjureRuntime};
30use conjure_runtime_config::service_config;
31use refreshable::{RefreshHandle, Refreshable};
32use std::borrow::Borrow;
33use std::collections::HashMap;
34use std::sync::Arc;
35#[cfg(not(target_arch = "wasm32"))]
36use tokio::runtime::Handle;
37use witchcraft_log::warn;
38use witchcraft_metrics::MetricRegistry;
39
40const STATE_CACHE_CAPACITY: usize = 10_000;
41
42/// A factory type which can create clients that will live-reload in response to configuration updates.
43#[derive(Clone)]
44pub struct ClientFactory<T = Complete>(T);
45
46/// The config builder stage.
47pub struct ConfigStage(());
48
49/// The user agent builder stage.
50pub struct UserAgentStage {
51    config: Arc<Refreshable<ServicesConfig, Error>>,
52}
53
54#[derive(Clone)]
55struct CacheManager {
56    uncached_inner: UncachedConfig,
57    cache: WeakCache<CachedConfig, ClientState>,
58}
59
60impl CacheManager {
61    fn uncached(&self) -> &UncachedConfig {
62        &self.uncached_inner
63    }
64
65    fn uncached_mut(&mut self) -> &mut UncachedConfig {
66        self.cache = WeakCache::new(STATE_CACHE_CAPACITY);
67        &mut self.uncached_inner
68    }
69}
70
71/// The complete builder stage.
72#[derive(Clone)]
73pub struct Complete {
74    config: Arc<Refreshable<ServicesConfig, Error>>,
75    user_agent: UserAgent,
76    client_qos: ClientQos,
77    server_qos: ServerQos,
78    service_error: ServiceError,
79    idempotency: Idempotency,
80    node_selection_strategy: NodeSelectionStrategy,
81    cache_manager: CacheManager,
82}
83
84impl Default for ClientFactory<ConfigStage> {
85    #[inline]
86    fn default() -> Self {
87        ClientFactory::builder()
88    }
89}
90
91impl ClientFactory<ConfigStage> {
92    /// Creates a new builder to construct a client factory.
93    #[inline]
94    pub fn builder() -> Self {
95        ClientFactory(ConfigStage(()))
96    }
97
98    /// Sets the refreshable configuration used for service clients.
99    #[inline]
100    pub fn config(
101        self,
102        config: Refreshable<ServicesConfig, Error>,
103    ) -> ClientFactory<UserAgentStage> {
104        ClientFactory(UserAgentStage {
105            config: Arc::new(config),
106        })
107    }
108}
109
110impl ClientFactory<UserAgentStage> {
111    /// Sets the user agent sent by clients.
112    #[inline]
113    pub fn user_agent(self, user_agent: UserAgent) -> ClientFactory {
114        ClientFactory(Complete {
115            config: self.0.config,
116            user_agent,
117            client_qos: ClientQos::Enabled,
118            server_qos: ServerQos::AutomaticRetry,
119            service_error: ServiceError::WrapInNewError,
120            idempotency: Idempotency::ByMethod,
121            node_selection_strategy: NodeSelectionStrategy::PinUntilError,
122            cache_manager: CacheManager {
123                uncached_inner: UncachedConfig {
124                    metrics: None,
125                    host_metrics: None,
126                    #[cfg(not(target_arch = "wasm32"))]
127                    blocking_handle: None,
128                    conjure_runtime: Arc::new(ConjureRuntime::new()),
129                },
130                cache: WeakCache::new(STATE_CACHE_CAPACITY),
131            },
132        })
133    }
134}
135
136impl ClientFactory {
137    /// Sets the user agent sent by clients.
138    #[inline]
139    pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
140        self.0.user_agent = user_agent;
141        self
142    }
143
144    /// Returns the configured user agent.
145    #[inline]
146    pub fn get_user_agent(&self) -> &UserAgent {
147        &self.0.user_agent
148    }
149
150    /// Sets clients' rate limiting behavior.
151    ///
152    /// Defaults to `ClientQos::Enabled`.
153    #[inline]
154    pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
155        self.0.client_qos = client_qos;
156        self
157    }
158
159    /// Returns the configured rate limiting behavior
160    #[inline]
161    pub fn get_client_qos(&self) -> ClientQos {
162        self.0.client_qos
163    }
164
165    /// Sets clients' behavior in response to a QoS error from the server.
166    ///
167    /// Defaults to `ServerQos::AutomaticRetry`.
168    #[inline]
169    pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
170        self.0.server_qos = server_qos;
171        self
172    }
173
174    /// Returns the configured QoS behavior.
175    #[inline]
176    pub fn get_server_qos(&self) -> ServerQos {
177        self.0.server_qos
178    }
179
180    /// Sets clients' behavior in response to a service error from the server.
181    ///
182    /// Defaults to `ServiceError::WrapInNewError`.
183    #[inline]
184    pub fn service_error(mut self, service_error: ServiceError) -> Self {
185        self.0.service_error = service_error;
186        self
187    }
188
189    /// Returns the configured service error behavior.
190    #[inline]
191    pub fn get_service_error(&self) -> ServiceError {
192        self.0.service_error
193    }
194
195    /// Sets clients' behavior to determine if a request is idempotent or not.
196    ///
197    /// Only idempotent requests will be retried.
198    ///
199    /// Defaults to `Idempotency::ByMethod`.
200    #[inline]
201    pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
202        self.0.idempotency = idempotency;
203        self
204    }
205
206    /// Returns the configured idempotency behavior.
207    #[inline]
208    pub fn get_idempotency(&self) -> Idempotency {
209        self.0.idempotency
210    }
211
212    /// Sets the clients' strategy for selecting a node for a request.
213    ///
214    /// Defaults to `NodeSelectionStrategy::PinUntilError`.
215    #[inline]
216    pub fn node_selection_strategy(
217        mut self,
218        node_selection_strategy: NodeSelectionStrategy,
219    ) -> Self {
220        self.0.node_selection_strategy = node_selection_strategy;
221        self
222    }
223
224    /// Returns the configured node selection strategy.
225    #[inline]
226    pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
227        self.0.node_selection_strategy
228    }
229
230    /// Sets the metric registry used to register client metrics.
231    ///
232    /// Defaults to no registry.
233    #[inline]
234    pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
235        self.0.cache_manager.uncached_mut().metrics = Some(metrics);
236        self
237    }
238
239    /// Returns the configured metrics registry.
240    #[inline]
241    pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
242        self.0.cache_manager.uncached().metrics.as_ref()
243    }
244
245    /// Sets the host metrics registry used to track host performance.
246    ///
247    /// Defaults to no registry.
248    #[inline]
249    pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
250        self.0.cache_manager.uncached_mut().host_metrics = Some(host_metrics);
251        self
252    }
253
254    /// Returns the configured host metrics registry.
255    #[inline]
256    pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
257        self.0.cache_manager.uncached().host_metrics.as_ref()
258    }
259
260    fn state_builder(&self, service: &str) -> StateBuilder {
261        StateBuilder {
262            service: service.to_string(),
263            user_agent: self.0.user_agent.clone(),
264            client_qos: self.0.client_qos,
265            server_qos: self.0.server_qos,
266            service_error: self.0.service_error,
267            idempotency: self.0.idempotency,
268            node_selection_strategy: self.0.node_selection_strategy,
269            cache_manager: self.0.cache_manager.clone(),
270        }
271    }
272
273    /// Creates a new client for the specified service.
274    ///
275    /// The client's configuration will automatically refresh to track changes in the factory's [`ServicesConfig`].
276    ///
277    /// If no configuration is present for the specified service in the [`ServicesConfig`], the client will
278    /// immediately return an error for all requests.
279    ///
280    /// The method can return any type implementing the `conjure-http` [`AsyncService`] trait. This notably includes all
281    /// Conjure-generated client types as well as the `conjure-runtime` [`Client`] itself.
282    pub fn client<T>(&self, service: &str) -> Result<T, Error>
283    where
284        T: AsyncService<Client>,
285    {
286        self.client_inner(service)
287            .map(|c| T::new(c, &self.0.cache_manager.uncached().conjure_runtime))
288    }
289
290    fn client_inner(&self, service: &str) -> Result<Client, Error> {
291        let service_config = self.0.config.map({
292            let service = service.to_string();
293            move |c| c.merged_service(&service).unwrap_or_default()
294        });
295        let state_builder = self.state_builder(service);
296        Self::raw_client(state_builder, service_config, None)
297    }
298
299    fn raw_client<T>(
300        state_builder: T,
301        service_config: Refreshable<ServiceConfig, Error>,
302        override_host_index: Option<usize>,
303    ) -> Result<Client, Error>
304    where
305        T: Borrow<StateBuilder> + 'static + Sync + Send,
306    {
307        let state = state_builder
308            .borrow()
309            .build(&service_config.get(), override_host_index)?;
310        let state = Arc::new(ArcSwap::new(state));
311
312        let subscription = service_config.try_subscribe({
313            let state = state.clone();
314            move |config| {
315                let new_state = state_builder.borrow().build(config, override_host_index)?;
316                state.store(new_state);
317                Ok(())
318            }
319        })?;
320
321        Ok(Client::new(state, Some(subscription)))
322    }
323
324    /// Creates a refreshable collection of clients, each corresponding to a separate replica of the
325    /// service.
326    ///
327    /// # Note
328    ///
329    /// The client type `T` is assumed to be stateless - each instance will be recreated on every
330    /// refresh.
331    pub fn per_host_clients<T>(
332        &self,
333        service: &str,
334    ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
335    where
336        T: AsyncService<Client> + 'static + Sync + Send,
337    {
338        self.per_host_clients_inner(service, T::new)
339    }
340
341    fn per_host_clients_inner<T>(
342        &self,
343        service: &str,
344        make_client: impl Fn(Client, &Arc<ConjureRuntime>) -> T + 'static + Sync + Send,
345    ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
346    where
347        T: 'static + Sync + Send,
348    {
349        let state_builder = Arc::new(self.state_builder(service));
350
351        self.0
352            .config
353            .map({
354                let service = service.to_string();
355                move |c| c.merged_service(&service).unwrap_or_default()
356            })
357            .try_map({
358                let mut client_handles = HashMap::<Host, ClientHandle>::new();
359                move |c| {
360                    let mut new_client_handles = HashMap::new();
361                    let mut clients = HashMap::new();
362                    let mut errors = vec![];
363
364                    for (i, uri) in c.uris().iter().enumerate() {
365                        let host = Host { uri: uri.clone() };
366                        let host_config = service_config::Builder::from(c.clone())
367                            .uris([uri.clone()])
368                            .build();
369
370                        let client_handle = match client_handles.remove(&host) {
371                            Some(mut client_handle) => {
372                                if let Err(es) = client_handle.handle.refresh(host_config) {
373                                    errors.extend(es);
374                                }
375                                client_handle
376                            }
377                            None => {
378                                let (host_config, handle) = Refreshable::new(host_config);
379                                let client = match Self::raw_client(
380                                    state_builder.clone(),
381                                    host_config,
382                                    Some(i),
383                                ) {
384                                    Ok(state) => state,
385                                    Err(e) => {
386                                        errors.push(e);
387                                        continue;
388                                    }
389                                };
390                                ClientHandle { client, handle }
391                            }
392                        };
393
394                        clients.insert(
395                            host.clone(),
396                            make_client(
397                                client_handle.client.clone(),
398                                &state_builder.cache_manager.uncached().conjure_runtime,
399                            ),
400                        );
401                        new_client_handles.insert(host, client_handle);
402                    }
403
404                    client_handles = new_client_handles;
405                    match errors.pop() {
406                        Some(e) => {
407                            for e2 in errors {
408                                warn!("error reloading per-host clients", error: e2);
409                            }
410                            Err(e)
411                        }
412                        None => Ok(PerHostClients { clients }),
413                    }
414                }
415            })
416    }
417}
418
419#[cfg(not(target_arch = "wasm32"))]
420impl ClientFactory {
421    /// Returns the `Handle` to the tokio `Runtime` to be used by blocking clients.
422    ///
423    /// This has no effect on async clients.
424    ///
425    /// Defaults to a `conjure-runtime` internal `Runtime`.
426    #[inline]
427    pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
428        self.0.cache_manager.uncached_mut().blocking_handle = Some(blocking_handle);
429        self
430    }
431
432    /// Returns the configured blocking handle.
433    #[inline]
434    pub fn get_blocking_handle(&self) -> Option<&Handle> {
435        self.0.cache_manager.uncached().blocking_handle.as_ref()
436    }
437
438    /// Creates a new blocking client for the specified service.
439    ///
440    /// The client's configuration will automatically refresh to track changes in the factory's [`ServicesConfig`].
441    ///
442    /// If no configuration is present for the specified service in the [`ServicesConfig`], the client will
443    /// immediately return an error for all requests.
444    ///
445    /// The method can return any type implementing the `conjure-http` [`Service`] trait. This notably includes all
446    /// Conjure-generated client types as well as the `conjure-runtime` [`blocking::Client`] itself.
447    pub fn blocking_client<T>(&self, service: &str) -> Result<T, Error>
448    where
449        T: Service<blocking::Client>,
450    {
451        self.blocking_client_inner(service)
452            .map(|c| T::new(c, &self.0.cache_manager.uncached().conjure_runtime))
453    }
454
455    fn blocking_client_inner(&self, service: &str) -> Result<blocking::Client, Error> {
456        self.client_inner(service).map(|client| blocking::Client {
457            client,
458            handle: self.0.cache_manager.uncached().blocking_handle.clone(),
459        })
460    }
461
462    /// Creates a refreshable collection of blocking clients, each corresponding to a separate
463    /// replica of the service.
464    ///
465    /// # Note
466    ///
467    /// The client type `T` is assumed to be stateless - each instance will be recreated on every
468    /// refresh.
469    pub fn blocking_per_host_clients<T>(
470        &self,
471        service: &str,
472    ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
473    where
474        T: Service<blocking::Client> + 'static + Sync + Send,
475    {
476        self.per_host_clients_inner(service, {
477            let handle = self.0.cache_manager.uncached().blocking_handle.clone();
478            move |client, runtime| {
479                T::new(
480                    blocking::Client {
481                        client,
482                        handle: handle.clone(),
483                    },
484                    runtime,
485                )
486            }
487        })
488    }
489}
490
491struct ClientHandle {
492    client: Client,
493    handle: RefreshHandle<ServiceConfig, Error>,
494}
495
496struct StateBuilder {
497    service: String,
498    user_agent: UserAgent,
499    client_qos: ClientQos,
500    server_qos: ServerQos,
501    service_error: ServiceError,
502    idempotency: Idempotency,
503    node_selection_strategy: NodeSelectionStrategy,
504    cache_manager: CacheManager,
505}
506
507impl StateBuilder {
508    fn build(
509        &self,
510        config: &ServiceConfig,
511        override_host_index: Option<usize>,
512    ) -> Result<Arc<Cached<CachedConfig, ClientState>>, Error> {
513        let mut builder = Client::builder()
514            .service(&self.service)
515            .user_agent(self.user_agent.clone())
516            .from_config(config)
517            .client_qos(self.client_qos)
518            .server_qos(self.server_qos)
519            .service_error(self.service_error)
520            .idempotency(self.idempotency)
521            .node_selection_strategy(self.node_selection_strategy);
522
523        if let Some(metrics) = self.cache_manager.uncached().metrics.clone() {
524            builder = builder.metrics(metrics);
525        }
526
527        if let Some(host_metrics) = self.cache_manager.uncached().host_metrics.clone() {
528            builder = builder.host_metrics(host_metrics);
529        }
530
531        if let Some(override_host_index) = override_host_index {
532            builder = builder.override_host_index(override_host_index);
533        }
534
535        self.cache_manager
536            .cache
537            .get(&builder, Builder::cached_config, ClientState::new)
538    }
539}