conjure_runtime/
client_factory.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! The client factory.
15#[cfg(not(target_arch = "wasm32"))]
16use crate::blocking;
17use crate::builder::{CachedConfig, UncachedConfig};
18use crate::config::{ServiceConfig, ServicesConfig};
19use crate::weak_cache::{Cached, WeakCache};
20use crate::{Builder, ClientState, Host, PerHostClients};
21use crate::{
22    Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos,
23    ServiceError, UserAgent,
24};
25use arc_swap::ArcSwap;
26use conjure_error::Error;
27#[cfg(not(target_arch = "wasm32"))]
28use conjure_http::client::Service;
29use conjure_http::client::{AsyncService, ConjureRuntime};
30use conjure_runtime_config::service_config;
31use refreshable::{RefreshHandle, Refreshable};
32use std::borrow::Borrow;
33use std::collections::HashMap;
34use std::sync::Arc;
35#[cfg(not(target_arch = "wasm32"))]
36use tokio::runtime::Handle;
37use witchcraft_log::warn;
38use witchcraft_metrics::MetricRegistry;
39
40const STATE_CACHE_CAPACITY: usize = 10_000;
41
42/// A factory type which can create clients that will live-reload in response to configuration updates.
43#[derive(Clone)]
44pub struct ClientFactory<T = Complete>(T);
45
46/// The config builder stage.
47pub struct ConfigStage(());
48
49/// The user agent builder stage.
50pub struct UserAgentStage {
51    config: Arc<Refreshable<ServicesConfig, Error>>,
52}
53
54#[derive(Clone)]
55struct CacheManager {
56    uncached_inner: UncachedConfig,
57    cache: WeakCache<CachedConfig, ClientState>,
58}
59
60impl CacheManager {
61    fn uncached(&self) -> &UncachedConfig {
62        &self.uncached_inner
63    }
64
65    fn uncached_mut(&mut self) -> &mut UncachedConfig {
66        self.cache = WeakCache::new(STATE_CACHE_CAPACITY);
67        &mut self.uncached_inner
68    }
69}
70
71/// The complete builder stage.
72#[derive(Clone)]
73pub struct Complete {
74    config: Arc<Refreshable<ServicesConfig, Error>>,
75    user_agent: UserAgent,
76    client_qos: ClientQos,
77    server_qos: ServerQos,
78    service_error: ServiceError,
79    idempotency: Idempotency,
80    node_selection_strategy: NodeSelectionStrategy,
81    cache_manager: CacheManager,
82}
83
84impl Default for ClientFactory<ConfigStage> {
85    #[inline]
86    fn default() -> Self {
87        ClientFactory::builder()
88    }
89}
90
91impl ClientFactory<ConfigStage> {
92    /// Creates a new builder to construct a client factory.
93    #[inline]
94    pub fn builder() -> Self {
95        ClientFactory(ConfigStage(()))
96    }
97
98    /// Sets the refreshable configuration used for service clients.
99    #[inline]
100    pub fn config(
101        self,
102        config: Refreshable<ServicesConfig, Error>,
103    ) -> ClientFactory<UserAgentStage> {
104        ClientFactory(UserAgentStage {
105            config: Arc::new(config),
106        })
107    }
108}
109
110impl ClientFactory<UserAgentStage> {
111    /// Sets the user agent sent by clients.
112    #[inline]
113    pub fn user_agent(self, user_agent: UserAgent) -> ClientFactory {
114        ClientFactory(Complete {
115            config: self.0.config,
116            user_agent,
117            client_qos: ClientQos::Enabled,
118            server_qos: ServerQos::AutomaticRetry,
119            service_error: ServiceError::WrapInNewError,
120            idempotency: Idempotency::ByMethod,
121            node_selection_strategy: NodeSelectionStrategy::PinUntilError,
122            cache_manager: CacheManager {
123                uncached_inner: UncachedConfig {
124                    metrics: None,
125                    host_metrics: None,
126                    #[cfg(not(target_arch = "wasm32"))]
127                    blocking_handle: None,
128                    conjure_runtime: Arc::new(ConjureRuntime::new()),
129                },
130                cache: WeakCache::new(STATE_CACHE_CAPACITY),
131            },
132        })
133    }
134}
135
136impl ClientFactory {
137    /// Sets the user agent sent by clients.
138    #[inline]
139    pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
140        self.0.user_agent = user_agent;
141        self
142    }
143
144    /// Returns the configured user agent.
145    #[inline]
146    pub fn get_user_agent(&self) -> &UserAgent {
147        &self.0.user_agent
148    }
149
150    /// Sets clients' rate limiting behavior.
151    ///
152    /// Defaults to `ClientQos::Enabled`.
153    #[inline]
154    pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
155        self.0.client_qos = client_qos;
156        self
157    }
158
159    /// Returns the configured rate limiting behavior
160    #[inline]
161    pub fn get_client_qos(&self) -> ClientQos {
162        self.0.client_qos
163    }
164
165    /// Sets clients' behavior in response to a QoS error from the server.
166    ///
167    /// Defaults to `ServerQos::AutomaticRetry`.
168    #[inline]
169    pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
170        self.0.server_qos = server_qos;
171        self
172    }
173
174    /// Returns the configured QoS behavior.
175    #[inline]
176    pub fn get_server_qos(&self) -> ServerQos {
177        self.0.server_qos
178    }
179
180    /// Sets clients' behavior in response to a service error from the server.
181    ///
182    /// Defaults to `ServiceError::WrapInNewError`.
183    #[inline]
184    pub fn service_error(mut self, service_error: ServiceError) -> Self {
185        self.0.service_error = service_error;
186        self
187    }
188
189    /// Returns the configured service error behavior.
190    #[inline]
191    pub fn get_service_error(&self) -> ServiceError {
192        self.0.service_error
193    }
194
195    /// Sets clients' behavior to determine if a request is idempotent or not.
196    ///
197    /// Only idempotent requests will be retried.
198    ///
199    /// Defaults to `Idempotency::ByMethod`.
200    #[inline]
201    pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
202        self.0.idempotency = idempotency;
203        self
204    }
205
206    /// Returns the configured idempotency behavior.
207    #[inline]
208    pub fn get_idempotency(&self) -> Idempotency {
209        self.0.idempotency
210    }
211
212    /// Sets the clients' strategy for selecting a node for a request.
213    ///
214    /// Defaults to `NodeSelectionStrategy::PinUntilError`.
215    #[inline]
216    pub fn node_selection_strategy(
217        mut self,
218        node_selection_strategy: NodeSelectionStrategy,
219    ) -> Self {
220        self.0.node_selection_strategy = node_selection_strategy;
221        self
222    }
223
224    /// Returns the configured node selection strategy.
225    #[inline]
226    pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
227        self.0.node_selection_strategy
228    }
229
230    /// Sets the metric registry used to register client metrics.
231    ///
232    /// Defaults to no registry.
233    #[inline]
234    pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
235        self.0.cache_manager.uncached_mut().metrics = Some(metrics);
236        self
237    }
238
239    /// Returns the configured metrics registry.
240    #[inline]
241    pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
242        self.0.cache_manager.uncached().metrics.as_ref()
243    }
244
245    /// Sets the host metrics registry used to track host performance.
246    ///
247    /// Defaults to no registry.
248    #[inline]
249    pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
250        self.0.cache_manager.uncached_mut().host_metrics = Some(host_metrics);
251        self
252    }
253
254    /// Returns the configured host metrics registry.
255    #[inline]
256    pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
257        self.0.cache_manager.uncached().host_metrics.as_ref()
258    }
259
260    /// Sets the Conjure runtime used to configure request and response encodings.
261    ///
262    /// Defaults to `ConjureRuntime::default()`.
263    #[inline]
264    pub fn conjure_runtime(mut self, conjure_runtime: Arc<ConjureRuntime>) -> Self {
265        self.0.cache_manager.uncached_mut().conjure_runtime = conjure_runtime;
266        self
267    }
268
269    /// Returns the configured Conjure runtime.
270    pub fn get_conjure_runtime(&self) -> &Arc<ConjureRuntime> {
271        &self.0.cache_manager.uncached().conjure_runtime
272    }
273
274    fn state_builder(&self, service: &str) -> StateBuilder {
275        StateBuilder {
276            service: service.to_string(),
277            user_agent: self.0.user_agent.clone(),
278            client_qos: self.0.client_qos,
279            server_qos: self.0.server_qos,
280            service_error: self.0.service_error,
281            idempotency: self.0.idempotency,
282            node_selection_strategy: self.0.node_selection_strategy,
283            cache_manager: self.0.cache_manager.clone(),
284        }
285    }
286
287    /// Creates a new client for the specified service.
288    ///
289    /// The client's configuration will automatically refresh to track changes in the factory's [`ServicesConfig`].
290    ///
291    /// If no configuration is present for the specified service in the [`ServicesConfig`], the client will
292    /// immediately return an error for all requests.
293    ///
294    /// The method can return any type implementing the `conjure-http` [`AsyncService`] trait. This notably includes all
295    /// Conjure-generated client types as well as the `conjure-runtime` [`Client`] itself.
296    pub fn client<T>(&self, service: &str) -> Result<T, Error>
297    where
298        T: AsyncService<Client>,
299    {
300        self.client_inner(service)
301            .map(|c| T::new(c, &self.0.cache_manager.uncached().conjure_runtime))
302    }
303
304    fn client_inner(&self, service: &str) -> Result<Client, Error> {
305        let service_config = self.0.config.map({
306            let service = service.to_string();
307            move |c| c.merged_service(&service).unwrap_or_default()
308        });
309        let state_builder = self.state_builder(service);
310        Self::raw_client(state_builder, service_config, None)
311    }
312
313    fn raw_client<T>(
314        state_builder: T,
315        service_config: Refreshable<ServiceConfig, Error>,
316        override_host_index: Option<usize>,
317    ) -> Result<Client, Error>
318    where
319        T: Borrow<StateBuilder> + 'static + Sync + Send,
320    {
321        let state = state_builder
322            .borrow()
323            .build(&service_config.get(), override_host_index)?;
324        let state = Arc::new(ArcSwap::new(state));
325
326        let subscription = service_config.try_subscribe({
327            let state = state.clone();
328            move |config| {
329                let new_state = state_builder.borrow().build(config, override_host_index)?;
330                state.store(new_state);
331                Ok(())
332            }
333        })?;
334
335        Ok(Client::new(state, Some(subscription)))
336    }
337
338    /// Creates a refreshable collection of clients, each corresponding to a separate replica of the
339    /// service.
340    ///
341    /// # Note
342    ///
343    /// The client type `T` is assumed to be stateless - each instance will be recreated on every
344    /// refresh.
345    pub fn per_host_clients<T>(
346        &self,
347        service: &str,
348    ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
349    where
350        T: AsyncService<Client> + 'static + Sync + Send,
351    {
352        self.per_host_clients_inner(service, T::new)
353    }
354
355    fn per_host_clients_inner<T>(
356        &self,
357        service: &str,
358        make_client: impl Fn(Client, &Arc<ConjureRuntime>) -> T + 'static + Sync + Send,
359    ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
360    where
361        T: 'static + Sync + Send,
362    {
363        let state_builder = Arc::new(self.state_builder(service));
364
365        self.0
366            .config
367            .map({
368                let service = service.to_string();
369                move |c| c.merged_service(&service).unwrap_or_default()
370            })
371            .try_map({
372                let mut client_handles = HashMap::<Host, ClientHandle>::new();
373                move |c| {
374                    let mut new_client_handles = HashMap::new();
375                    let mut clients = HashMap::new();
376                    let mut errors = vec![];
377
378                    for (i, uri) in c.uris().iter().enumerate() {
379                        let host = Host { uri: uri.clone() };
380                        let host_config = service_config::Builder::from(c.clone())
381                            .uris([uri.clone()])
382                            .build();
383
384                        let client_handle = match client_handles.remove(&host) {
385                            Some(mut client_handle) => {
386                                if let Err(es) = client_handle.handle.refresh(host_config) {
387                                    errors.extend(es);
388                                }
389                                client_handle
390                            }
391                            None => {
392                                let (host_config, handle) = Refreshable::new(host_config);
393                                let client = match Self::raw_client(
394                                    state_builder.clone(),
395                                    host_config,
396                                    Some(i),
397                                ) {
398                                    Ok(state) => state,
399                                    Err(e) => {
400                                        errors.push(e);
401                                        continue;
402                                    }
403                                };
404                                ClientHandle { client, handle }
405                            }
406                        };
407
408                        clients.insert(
409                            host.clone(),
410                            make_client(
411                                client_handle.client.clone(),
412                                &state_builder.cache_manager.uncached().conjure_runtime,
413                            ),
414                        );
415                        new_client_handles.insert(host, client_handle);
416                    }
417
418                    client_handles = new_client_handles;
419                    match errors.pop() {
420                        Some(e) => {
421                            for e2 in errors {
422                                warn!("error reloading per-host clients", error: e2);
423                            }
424                            Err(e)
425                        }
426                        None => Ok(PerHostClients { clients }),
427                    }
428                }
429            })
430    }
431}
432
433#[cfg(not(target_arch = "wasm32"))]
434impl ClientFactory {
435    /// Returns the `Handle` to the tokio `Runtime` to be used by blocking clients.
436    ///
437    /// This has no effect on async clients.
438    ///
439    /// Defaults to a `conjure-runtime` internal `Runtime`.
440    #[inline]
441    pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
442        self.0.cache_manager.uncached_mut().blocking_handle = Some(blocking_handle);
443        self
444    }
445
446    /// Returns the configured blocking handle.
447    #[inline]
448    pub fn get_blocking_handle(&self) -> Option<&Handle> {
449        self.0.cache_manager.uncached().blocking_handle.as_ref()
450    }
451
452    /// Creates a new blocking client for the specified service.
453    ///
454    /// The client's configuration will automatically refresh to track changes in the factory's [`ServicesConfig`].
455    ///
456    /// If no configuration is present for the specified service in the [`ServicesConfig`], the client will
457    /// immediately return an error for all requests.
458    ///
459    /// The method can return any type implementing the `conjure-http` [`Service`] trait. This notably includes all
460    /// Conjure-generated client types as well as the `conjure-runtime` [`blocking::Client`] itself.
461    pub fn blocking_client<T>(&self, service: &str) -> Result<T, Error>
462    where
463        T: Service<blocking::Client>,
464    {
465        self.blocking_client_inner(service)
466            .map(|c| T::new(c, &self.0.cache_manager.uncached().conjure_runtime))
467    }
468
469    fn blocking_client_inner(&self, service: &str) -> Result<blocking::Client, Error> {
470        self.client_inner(service).map(|client| blocking::Client {
471            client,
472            handle: self.0.cache_manager.uncached().blocking_handle.clone(),
473        })
474    }
475
476    /// Creates a refreshable collection of blocking clients, each corresponding to a separate
477    /// replica of the service.
478    ///
479    /// # Note
480    ///
481    /// The client type `T` is assumed to be stateless - each instance will be recreated on every
482    /// refresh.
483    pub fn blocking_per_host_clients<T>(
484        &self,
485        service: &str,
486    ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
487    where
488        T: Service<blocking::Client> + 'static + Sync + Send,
489    {
490        self.per_host_clients_inner(service, {
491            let handle = self.0.cache_manager.uncached().blocking_handle.clone();
492            move |client, runtime| {
493                T::new(
494                    blocking::Client {
495                        client,
496                        handle: handle.clone(),
497                    },
498                    runtime,
499                )
500            }
501        })
502    }
503}
504
505struct ClientHandle {
506    client: Client,
507    handle: RefreshHandle<ServiceConfig, Error>,
508}
509
510struct StateBuilder {
511    service: String,
512    user_agent: UserAgent,
513    client_qos: ClientQos,
514    server_qos: ServerQos,
515    service_error: ServiceError,
516    idempotency: Idempotency,
517    node_selection_strategy: NodeSelectionStrategy,
518    cache_manager: CacheManager,
519}
520
521impl StateBuilder {
522    fn build(
523        &self,
524        config: &ServiceConfig,
525        override_host_index: Option<usize>,
526    ) -> Result<Arc<Cached<CachedConfig, ClientState>>, Error> {
527        let mut builder = Client::builder()
528            .service(&self.service)
529            .user_agent(self.user_agent.clone())
530            .from_config(config)
531            .client_qos(self.client_qos)
532            .server_qos(self.server_qos)
533            .service_error(self.service_error)
534            .idempotency(self.idempotency)
535            .node_selection_strategy(self.node_selection_strategy)
536            .conjure_runtime(self.cache_manager.uncached().conjure_runtime.clone());
537
538        if let Some(metrics) = self.cache_manager.uncached().metrics.clone() {
539            builder = builder.metrics(metrics);
540        }
541
542        if let Some(host_metrics) = self.cache_manager.uncached().host_metrics.clone() {
543            builder = builder.host_metrics(host_metrics);
544        }
545
546        if let Some(override_host_index) = override_host_index {
547            builder = builder.override_host_index(override_host_index);
548        }
549
550        self.cache_manager
551            .cache
552            .get(&builder, Builder::cached_config, ClientState::new)
553    }
554}