1#[cfg(not(target_arch = "wasm32"))]
16use crate::blocking;
17use crate::builder::{CachedConfig, UncachedConfig};
18use crate::config::{ServiceConfig, ServicesConfig};
19use crate::weak_cache::{Cached, WeakCache};
20use crate::{Builder, ClientState, Host, PerHostClients};
21use crate::{
22 Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos,
23 ServiceError, UserAgent,
24};
25use arc_swap::ArcSwap;
26use conjure_error::Error;
27#[cfg(not(target_arch = "wasm32"))]
28use conjure_http::client::Service;
29use conjure_http::client::{AsyncService, ConjureRuntime};
30use conjure_runtime_config::service_config;
31use refreshable::{RefreshHandle, Refreshable};
32use std::borrow::Borrow;
33use std::collections::HashMap;
34use std::sync::Arc;
35#[cfg(not(target_arch = "wasm32"))]
36use tokio::runtime::Handle;
37use witchcraft_log::warn;
38use witchcraft_metrics::MetricRegistry;
39
40const STATE_CACHE_CAPACITY: usize = 10_000;
41
42#[derive(Clone)]
44pub struct ClientFactory<T = Complete>(T);
45
46pub struct ConfigStage(());
48
49pub struct UserAgentStage {
51 config: Arc<Refreshable<ServicesConfig, Error>>,
52}
53
54#[derive(Clone)]
55struct CacheManager {
56 uncached_inner: UncachedConfig,
57 cache: WeakCache<CachedConfig, ClientState>,
58}
59
60impl CacheManager {
61 fn uncached(&self) -> &UncachedConfig {
62 &self.uncached_inner
63 }
64
65 fn uncached_mut(&mut self) -> &mut UncachedConfig {
66 self.cache = WeakCache::new(STATE_CACHE_CAPACITY);
67 &mut self.uncached_inner
68 }
69}
70
71#[derive(Clone)]
73pub struct Complete {
74 config: Arc<Refreshable<ServicesConfig, Error>>,
75 user_agent: UserAgent,
76 client_qos: ClientQos,
77 server_qos: ServerQos,
78 service_error: ServiceError,
79 idempotency: Idempotency,
80 node_selection_strategy: NodeSelectionStrategy,
81 cache_manager: CacheManager,
82}
83
84impl Default for ClientFactory<ConfigStage> {
85 #[inline]
86 fn default() -> Self {
87 ClientFactory::builder()
88 }
89}
90
91impl ClientFactory<ConfigStage> {
92 #[inline]
94 pub fn builder() -> Self {
95 ClientFactory(ConfigStage(()))
96 }
97
98 #[inline]
100 pub fn config(
101 self,
102 config: Refreshable<ServicesConfig, Error>,
103 ) -> ClientFactory<UserAgentStage> {
104 ClientFactory(UserAgentStage {
105 config: Arc::new(config),
106 })
107 }
108}
109
110impl ClientFactory<UserAgentStage> {
111 #[inline]
113 pub fn user_agent(self, user_agent: UserAgent) -> ClientFactory {
114 ClientFactory(Complete {
115 config: self.0.config,
116 user_agent,
117 client_qos: ClientQos::Enabled,
118 server_qos: ServerQos::AutomaticRetry,
119 service_error: ServiceError::WrapInNewError,
120 idempotency: Idempotency::ByMethod,
121 node_selection_strategy: NodeSelectionStrategy::PinUntilError,
122 cache_manager: CacheManager {
123 uncached_inner: UncachedConfig {
124 metrics: None,
125 host_metrics: None,
126 #[cfg(not(target_arch = "wasm32"))]
127 blocking_handle: None,
128 conjure_runtime: Arc::new(ConjureRuntime::new()),
129 },
130 cache: WeakCache::new(STATE_CACHE_CAPACITY),
131 },
132 })
133 }
134}
135
136impl ClientFactory {
137 #[inline]
139 pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
140 self.0.user_agent = user_agent;
141 self
142 }
143
144 #[inline]
146 pub fn get_user_agent(&self) -> &UserAgent {
147 &self.0.user_agent
148 }
149
150 #[inline]
154 pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
155 self.0.client_qos = client_qos;
156 self
157 }
158
159 #[inline]
161 pub fn get_client_qos(&self) -> ClientQos {
162 self.0.client_qos
163 }
164
165 #[inline]
169 pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
170 self.0.server_qos = server_qos;
171 self
172 }
173
174 #[inline]
176 pub fn get_server_qos(&self) -> ServerQos {
177 self.0.server_qos
178 }
179
180 #[inline]
184 pub fn service_error(mut self, service_error: ServiceError) -> Self {
185 self.0.service_error = service_error;
186 self
187 }
188
189 #[inline]
191 pub fn get_service_error(&self) -> ServiceError {
192 self.0.service_error
193 }
194
195 #[inline]
201 pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
202 self.0.idempotency = idempotency;
203 self
204 }
205
206 #[inline]
208 pub fn get_idempotency(&self) -> Idempotency {
209 self.0.idempotency
210 }
211
212 #[inline]
216 pub fn node_selection_strategy(
217 mut self,
218 node_selection_strategy: NodeSelectionStrategy,
219 ) -> Self {
220 self.0.node_selection_strategy = node_selection_strategy;
221 self
222 }
223
224 #[inline]
226 pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
227 self.0.node_selection_strategy
228 }
229
230 #[inline]
234 pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
235 self.0.cache_manager.uncached_mut().metrics = Some(metrics);
236 self
237 }
238
239 #[inline]
241 pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
242 self.0.cache_manager.uncached().metrics.as_ref()
243 }
244
245 #[inline]
249 pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
250 self.0.cache_manager.uncached_mut().host_metrics = Some(host_metrics);
251 self
252 }
253
254 #[inline]
256 pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
257 self.0.cache_manager.uncached().host_metrics.as_ref()
258 }
259
260 fn state_builder(&self, service: &str) -> StateBuilder {
261 StateBuilder {
262 service: service.to_string(),
263 user_agent: self.0.user_agent.clone(),
264 client_qos: self.0.client_qos,
265 server_qos: self.0.server_qos,
266 service_error: self.0.service_error,
267 idempotency: self.0.idempotency,
268 node_selection_strategy: self.0.node_selection_strategy,
269 cache_manager: self.0.cache_manager.clone(),
270 }
271 }
272
273 pub fn client<T>(&self, service: &str) -> Result<T, Error>
283 where
284 T: AsyncService<Client>,
285 {
286 self.client_inner(service)
287 .map(|c| T::new(c, &self.0.cache_manager.uncached().conjure_runtime))
288 }
289
290 fn client_inner(&self, service: &str) -> Result<Client, Error> {
291 let service_config = self.0.config.map({
292 let service = service.to_string();
293 move |c| c.merged_service(&service).unwrap_or_default()
294 });
295 let state_builder = self.state_builder(service);
296 Self::raw_client(state_builder, service_config, None)
297 }
298
299 fn raw_client<T>(
300 state_builder: T,
301 service_config: Refreshable<ServiceConfig, Error>,
302 override_host_index: Option<usize>,
303 ) -> Result<Client, Error>
304 where
305 T: Borrow<StateBuilder> + 'static + Sync + Send,
306 {
307 let state = state_builder
308 .borrow()
309 .build(&service_config.get(), override_host_index)?;
310 let state = Arc::new(ArcSwap::new(state));
311
312 let subscription = service_config.try_subscribe({
313 let state = state.clone();
314 move |config| {
315 let new_state = state_builder.borrow().build(config, override_host_index)?;
316 state.store(new_state);
317 Ok(())
318 }
319 })?;
320
321 Ok(Client::new(state, Some(subscription)))
322 }
323
324 pub fn per_host_clients<T>(
332 &self,
333 service: &str,
334 ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
335 where
336 T: AsyncService<Client> + 'static + Sync + Send,
337 {
338 self.per_host_clients_inner(service, T::new)
339 }
340
341 fn per_host_clients_inner<T>(
342 &self,
343 service: &str,
344 make_client: impl Fn(Client, &Arc<ConjureRuntime>) -> T + 'static + Sync + Send,
345 ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
346 where
347 T: 'static + Sync + Send,
348 {
349 let state_builder = Arc::new(self.state_builder(service));
350
351 self.0
352 .config
353 .map({
354 let service = service.to_string();
355 move |c| c.merged_service(&service).unwrap_or_default()
356 })
357 .try_map({
358 let mut client_handles = HashMap::<Host, ClientHandle>::new();
359 move |c| {
360 let mut new_client_handles = HashMap::new();
361 let mut clients = HashMap::new();
362 let mut errors = vec![];
363
364 for (i, uri) in c.uris().iter().enumerate() {
365 let host = Host { uri: uri.clone() };
366 let host_config = service_config::Builder::from(c.clone())
367 .uris([uri.clone()])
368 .build();
369
370 let client_handle = match client_handles.remove(&host) {
371 Some(mut client_handle) => {
372 if let Err(es) = client_handle.handle.refresh(host_config) {
373 errors.extend(es);
374 }
375 client_handle
376 }
377 None => {
378 let (host_config, handle) = Refreshable::new(host_config);
379 let client = match Self::raw_client(
380 state_builder.clone(),
381 host_config,
382 Some(i),
383 ) {
384 Ok(state) => state,
385 Err(e) => {
386 errors.push(e);
387 continue;
388 }
389 };
390 ClientHandle { client, handle }
391 }
392 };
393
394 clients.insert(
395 host.clone(),
396 make_client(
397 client_handle.client.clone(),
398 &state_builder.cache_manager.uncached().conjure_runtime,
399 ),
400 );
401 new_client_handles.insert(host, client_handle);
402 }
403
404 client_handles = new_client_handles;
405 match errors.pop() {
406 Some(e) => {
407 for e2 in errors {
408 warn!("error reloading per-host clients", error: e2);
409 }
410 Err(e)
411 }
412 None => Ok(PerHostClients { clients }),
413 }
414 }
415 })
416 }
417}
418
419#[cfg(not(target_arch = "wasm32"))]
420impl ClientFactory {
421 #[inline]
427 pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
428 self.0.cache_manager.uncached_mut().blocking_handle = Some(blocking_handle);
429 self
430 }
431
432 #[inline]
434 pub fn get_blocking_handle(&self) -> Option<&Handle> {
435 self.0.cache_manager.uncached().blocking_handle.as_ref()
436 }
437
438 pub fn blocking_client<T>(&self, service: &str) -> Result<T, Error>
448 where
449 T: Service<blocking::Client>,
450 {
451 self.blocking_client_inner(service)
452 .map(|c| T::new(c, &self.0.cache_manager.uncached().conjure_runtime))
453 }
454
455 fn blocking_client_inner(&self, service: &str) -> Result<blocking::Client, Error> {
456 self.client_inner(service).map(|client| blocking::Client {
457 client,
458 handle: self.0.cache_manager.uncached().blocking_handle.clone(),
459 })
460 }
461
462 pub fn blocking_per_host_clients<T>(
470 &self,
471 service: &str,
472 ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
473 where
474 T: Service<blocking::Client> + 'static + Sync + Send,
475 {
476 self.per_host_clients_inner(service, {
477 let handle = self.0.cache_manager.uncached().blocking_handle.clone();
478 move |client, runtime| {
479 T::new(
480 blocking::Client {
481 client,
482 handle: handle.clone(),
483 },
484 runtime,
485 )
486 }
487 })
488 }
489}
490
491struct ClientHandle {
492 client: Client,
493 handle: RefreshHandle<ServiceConfig, Error>,
494}
495
496struct StateBuilder {
497 service: String,
498 user_agent: UserAgent,
499 client_qos: ClientQos,
500 server_qos: ServerQos,
501 service_error: ServiceError,
502 idempotency: Idempotency,
503 node_selection_strategy: NodeSelectionStrategy,
504 cache_manager: CacheManager,
505}
506
507impl StateBuilder {
508 fn build(
509 &self,
510 config: &ServiceConfig,
511 override_host_index: Option<usize>,
512 ) -> Result<Arc<Cached<CachedConfig, ClientState>>, Error> {
513 let mut builder = Client::builder()
514 .service(&self.service)
515 .user_agent(self.user_agent.clone())
516 .from_config(config)
517 .client_qos(self.client_qos)
518 .server_qos(self.server_qos)
519 .service_error(self.service_error)
520 .idempotency(self.idempotency)
521 .node_selection_strategy(self.node_selection_strategy);
522
523 if let Some(metrics) = self.cache_manager.uncached().metrics.clone() {
524 builder = builder.metrics(metrics);
525 }
526
527 if let Some(host_metrics) = self.cache_manager.uncached().host_metrics.clone() {
528 builder = builder.host_metrics(host_metrics);
529 }
530
531 if let Some(override_host_index) = override_host_index {
532 builder = builder.override_host_index(override_host_index);
533 }
534
535 self.cache_manager
536 .cache
537 .get(&builder, Builder::cached_config, ClientState::new)
538 }
539}