1use crate::builder::{CachedConfig, UncachedConfig};
16use crate::config::{ServiceConfig, ServicesConfig};
17use crate::raw::{DefaultRawClient, DefaultRawClientBuilder};
18use crate::weak_cache::{Cached, WeakCache};
19use crate::{blocking, Builder, ClientState, Host, PerHostClients};
20use crate::{
21 Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos,
22 ServiceError, UserAgent,
23};
24use arc_swap::ArcSwap;
25use conjure_error::Error;
26use conjure_http::client::{AsyncService, Service};
27use conjure_runtime_config::service_config;
28use refreshable::{RefreshHandle, Refreshable};
29use std::borrow::Borrow;
30use std::collections::HashMap;
31use std::sync::Arc;
32use tokio::runtime::Handle;
33use witchcraft_log::warn;
34use witchcraft_metrics::MetricRegistry;
35
36const STATE_CACHE_CAPACITY: usize = 10_000;
37
38#[derive(Clone)]
40pub struct ClientFactory<T = Complete>(T);
41
42pub struct ConfigStage(());
44
45pub struct UserAgentStage {
47 config: Arc<Refreshable<ServicesConfig, Error>>,
48}
49
50#[derive(Clone)]
51struct CacheManager {
52 uncached_inner: UncachedConfig<DefaultRawClientBuilder>,
53 cache: WeakCache<CachedConfig, ClientState<DefaultRawClient>>,
54}
55
56impl CacheManager {
57 fn uncached(&self) -> &UncachedConfig<DefaultRawClientBuilder> {
58 &self.uncached_inner
59 }
60
61 fn uncached_mut(&mut self) -> &mut UncachedConfig<DefaultRawClientBuilder> {
62 self.cache = WeakCache::new(STATE_CACHE_CAPACITY);
63 &mut self.uncached_inner
64 }
65}
66
67#[derive(Clone)]
69pub struct Complete {
70 config: Arc<Refreshable<ServicesConfig, Error>>,
71 user_agent: UserAgent,
72 client_qos: ClientQos,
73 server_qos: ServerQos,
74 service_error: ServiceError,
75 idempotency: Idempotency,
76 node_selection_strategy: NodeSelectionStrategy,
77 cache_manager: CacheManager,
78}
79
80impl Default for ClientFactory<ConfigStage> {
81 #[inline]
82 fn default() -> Self {
83 ClientFactory::builder()
84 }
85}
86
87impl ClientFactory<ConfigStage> {
88 #[inline]
90 pub fn builder() -> Self {
91 ClientFactory(ConfigStage(()))
92 }
93
94 #[inline]
96 pub fn config(
97 self,
98 config: Refreshable<ServicesConfig, Error>,
99 ) -> ClientFactory<UserAgentStage> {
100 ClientFactory(UserAgentStage {
101 config: Arc::new(config),
102 })
103 }
104}
105
106impl ClientFactory<UserAgentStage> {
107 #[inline]
109 pub fn user_agent(self, user_agent: UserAgent) -> ClientFactory {
110 ClientFactory(Complete {
111 config: self.0.config,
112 user_agent,
113 client_qos: ClientQos::Enabled,
114 server_qos: ServerQos::AutomaticRetry,
115 service_error: ServiceError::WrapInNewError,
116 idempotency: Idempotency::ByMethod,
117 node_selection_strategy: NodeSelectionStrategy::PinUntilError,
118 cache_manager: CacheManager {
119 uncached_inner: UncachedConfig {
120 metrics: None,
121 host_metrics: None,
122 blocking_handle: None,
123 raw_client_builder: DefaultRawClientBuilder,
124 },
125 cache: WeakCache::new(STATE_CACHE_CAPACITY),
126 },
127 })
128 }
129}
130
131impl ClientFactory {
132 #[inline]
134 pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
135 self.0.user_agent = user_agent;
136 self
137 }
138
139 #[inline]
141 pub fn get_user_agent(&self) -> &UserAgent {
142 &self.0.user_agent
143 }
144
145 #[inline]
149 pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
150 self.0.client_qos = client_qos;
151 self
152 }
153
154 #[inline]
156 pub fn get_client_qos(&self) -> ClientQos {
157 self.0.client_qos
158 }
159
160 #[inline]
164 pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
165 self.0.server_qos = server_qos;
166 self
167 }
168
169 #[inline]
171 pub fn get_server_qos(&self) -> ServerQos {
172 self.0.server_qos
173 }
174
175 #[inline]
179 pub fn service_error(mut self, service_error: ServiceError) -> Self {
180 self.0.service_error = service_error;
181 self
182 }
183
184 #[inline]
186 pub fn get_service_error(&self) -> ServiceError {
187 self.0.service_error
188 }
189
190 #[inline]
196 pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
197 self.0.idempotency = idempotency;
198 self
199 }
200
201 #[inline]
203 pub fn get_idempotency(&self) -> Idempotency {
204 self.0.idempotency
205 }
206
207 #[inline]
211 pub fn node_selection_strategy(
212 mut self,
213 node_selection_strategy: NodeSelectionStrategy,
214 ) -> Self {
215 self.0.node_selection_strategy = node_selection_strategy;
216 self
217 }
218
219 #[inline]
221 pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
222 self.0.node_selection_strategy
223 }
224
225 #[inline]
229 pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
230 self.0.cache_manager.uncached_mut().metrics = Some(metrics);
231 self
232 }
233
234 #[inline]
236 pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
237 self.0.cache_manager.uncached().metrics.as_ref()
238 }
239
240 #[inline]
244 pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
245 self.0.cache_manager.uncached_mut().host_metrics = Some(host_metrics);
246 self
247 }
248
249 #[inline]
251 pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
252 self.0.cache_manager.uncached().host_metrics.as_ref()
253 }
254
255 #[inline]
261 pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
262 self.0.cache_manager.uncached_mut().blocking_handle = Some(blocking_handle);
263 self
264 }
265
266 #[inline]
268 pub fn get_blocking_handle(&self) -> Option<&Handle> {
269 self.0.cache_manager.uncached().blocking_handle.as_ref()
270 }
271
272 fn state_builder(&self, service: &str) -> StateBuilder {
273 StateBuilder {
274 service: service.to_string(),
275 user_agent: self.0.user_agent.clone(),
276 client_qos: self.0.client_qos,
277 server_qos: self.0.server_qos,
278 service_error: self.0.service_error,
279 idempotency: self.0.idempotency,
280 node_selection_strategy: self.0.node_selection_strategy,
281 cache_manager: self.0.cache_manager.clone(),
282 }
283 }
284
285 pub fn client<T>(&self, service: &str) -> Result<T, Error>
295 where
296 T: AsyncService<Client>,
297 {
298 self.client_inner(service).map(T::new)
299 }
300
301 fn client_inner(&self, service: &str) -> Result<Client, Error> {
302 let service_config = self.0.config.map({
303 let service = service.to_string();
304 move |c| c.merged_service(&service).unwrap_or_default()
305 });
306 let state_builder = self.state_builder(service);
307 Self::raw_client(state_builder, service_config, None)
308 }
309
310 fn raw_client<T>(
311 state_builder: T,
312 service_config: Refreshable<ServiceConfig, Error>,
313 override_host_index: Option<usize>,
314 ) -> Result<Client, Error>
315 where
316 T: Borrow<StateBuilder> + 'static + Sync + Send,
317 {
318 let state = state_builder
319 .borrow()
320 .build(&service_config.get(), override_host_index)?;
321 let state = Arc::new(ArcSwap::new(state));
322
323 let subscription = service_config.try_subscribe({
324 let state = state.clone();
325 move |config| {
326 let new_state = state_builder.borrow().build(config, override_host_index)?;
327 state.store(new_state);
328 Ok(())
329 }
330 })?;
331
332 Ok(Client::new(state, Some(subscription)))
333 }
334
335 pub fn blocking_client<T>(&self, service: &str) -> Result<T, Error>
345 where
346 T: Service<blocking::Client>,
347 {
348 self.blocking_client_inner(service).map(T::new)
349 }
350
351 fn blocking_client_inner(&self, service: &str) -> Result<blocking::Client, Error> {
352 self.client_inner(service).map(|client| blocking::Client {
353 client,
354 handle: self.0.cache_manager.uncached().blocking_handle.clone(),
355 })
356 }
357
358 pub fn per_host_clients<T>(
366 &self,
367 service: &str,
368 ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
369 where
370 T: AsyncService<Client> + 'static + Sync + Send,
371 {
372 self.per_host_clients_inner(service, T::new)
373 }
374
375 pub fn blocking_per_host_clients<T>(
383 &self,
384 service: &str,
385 ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
386 where
387 T: Service<blocking::Client> + 'static + Sync + Send,
388 {
389 self.per_host_clients_inner(service, {
390 let handle = self.0.cache_manager.uncached().blocking_handle.clone();
391 move |client| {
392 T::new(blocking::Client {
393 client,
394 handle: handle.clone(),
395 })
396 }
397 })
398 }
399
400 fn per_host_clients_inner<T>(
401 &self,
402 service: &str,
403 make_client: impl Fn(Client) -> T + 'static + Sync + Send,
404 ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
405 where
406 T: 'static + Sync + Send,
407 {
408 let state_builder = Arc::new(self.state_builder(service));
409
410 self.0
411 .config
412 .map({
413 let service = service.to_string();
414 move |c| c.merged_service(&service).unwrap_or_default()
415 })
416 .try_map({
417 let mut client_handles = HashMap::<Host, ClientHandle>::new();
418 move |c| {
419 let mut new_client_handles = HashMap::new();
420 let mut clients = HashMap::new();
421 let mut errors = vec![];
422
423 for (i, uri) in c.uris().iter().enumerate() {
424 let host = Host { uri: uri.clone() };
425 let host_config = service_config::Builder::from(c.clone())
426 .uris([uri.clone()])
427 .build();
428
429 let client_handle = match client_handles.remove(&host) {
430 Some(mut client_handle) => {
431 if let Err(es) = client_handle.handle.refresh(host_config) {
432 errors.extend(es);
433 }
434 client_handle
435 }
436 None => {
437 let (host_config, handle) = Refreshable::new(host_config);
438 let client = match Self::raw_client(
439 state_builder.clone(),
440 host_config,
441 Some(i),
442 ) {
443 Ok(state) => state,
444 Err(e) => {
445 errors.push(e);
446 continue;
447 }
448 };
449 ClientHandle { client, handle }
450 }
451 };
452
453 clients.insert(host.clone(), make_client(client_handle.client.clone()));
454 new_client_handles.insert(host, client_handle);
455 }
456
457 client_handles = new_client_handles;
458 match errors.pop() {
459 Some(e) => {
460 for e2 in errors {
461 warn!("error reloading per-host clients", error: e2);
462 }
463 Err(e)
464 }
465 None => Ok(PerHostClients { clients }),
466 }
467 }
468 })
469 }
470}
471
472struct ClientHandle {
473 client: Client,
474 handle: RefreshHandle<ServiceConfig, Error>,
475}
476
477struct StateBuilder {
478 service: String,
479 user_agent: UserAgent,
480 client_qos: ClientQos,
481 server_qos: ServerQos,
482 service_error: ServiceError,
483 idempotency: Idempotency,
484 node_selection_strategy: NodeSelectionStrategy,
485 cache_manager: CacheManager,
486}
487
488impl StateBuilder {
489 fn build(
490 &self,
491 config: &ServiceConfig,
492 override_host_index: Option<usize>,
493 ) -> Result<Arc<Cached<CachedConfig, ClientState<DefaultRawClient>>>, Error> {
494 let mut builder = Client::builder()
495 .service(&self.service)
496 .user_agent(self.user_agent.clone())
497 .from_config(config)
498 .client_qos(self.client_qos)
499 .server_qos(self.server_qos)
500 .service_error(self.service_error)
501 .idempotency(self.idempotency)
502 .node_selection_strategy(self.node_selection_strategy);
503
504 if let Some(metrics) = self.cache_manager.uncached().metrics.clone() {
505 builder = builder.metrics(metrics);
506 }
507
508 if let Some(host_metrics) = self.cache_manager.uncached().host_metrics.clone() {
509 builder = builder.host_metrics(host_metrics);
510 }
511
512 if let Some(override_host_index) = override_host_index {
513 builder = builder.override_host_index(override_host_index);
514 }
515
516 self.cache_manager
517 .cache
518 .get(&builder, Builder::cached_config, ClientState::new)
519 }
520}