1#[cfg(not(target_arch = "wasm32"))]
16use crate::blocking;
17use crate::builder::{CachedConfig, UncachedConfig};
18use crate::config::{ServiceConfig, ServicesConfig};
19use crate::weak_cache::{Cached, WeakCache};
20use crate::{Builder, ClientState, Host, PerHostClients};
21use crate::{
22 Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos,
23 ServiceError, UserAgent,
24};
25use arc_swap::ArcSwap;
26use conjure_error::Error;
27#[cfg(not(target_arch = "wasm32"))]
28use conjure_http::client::Service;
29use conjure_http::client::{AsyncService, ConjureRuntime};
30use conjure_runtime_config::service_config;
31use refreshable::{RefreshHandle, Refreshable};
32use std::borrow::Borrow;
33use std::collections::HashMap;
34use std::sync::Arc;
35#[cfg(not(target_arch = "wasm32"))]
36use tokio::runtime::Handle;
37use witchcraft_log::warn;
38use witchcraft_metrics::MetricRegistry;
39
40const STATE_CACHE_CAPACITY: usize = 10_000;
41
42#[derive(Clone)]
44pub struct ClientFactory<T = Complete>(T);
45
46pub struct ConfigStage(());
48
49pub struct UserAgentStage {
51 config: Arc<Refreshable<ServicesConfig, Error>>,
52}
53
54#[derive(Clone)]
55struct CacheManager {
56 uncached_inner: UncachedConfig,
57 cache: WeakCache<CachedConfig, ClientState>,
58}
59
60impl CacheManager {
61 fn uncached(&self) -> &UncachedConfig {
62 &self.uncached_inner
63 }
64
65 fn uncached_mut(&mut self) -> &mut UncachedConfig {
66 self.cache = WeakCache::new(STATE_CACHE_CAPACITY);
67 &mut self.uncached_inner
68 }
69}
70
71#[derive(Clone)]
73pub struct Complete {
74 config: Arc<Refreshable<ServicesConfig, Error>>,
75 user_agent: UserAgent,
76 client_qos: ClientQos,
77 server_qos: ServerQos,
78 service_error: ServiceError,
79 idempotency: Idempotency,
80 node_selection_strategy: NodeSelectionStrategy,
81 cache_manager: CacheManager,
82}
83
84impl Default for ClientFactory<ConfigStage> {
85 #[inline]
86 fn default() -> Self {
87 ClientFactory::builder()
88 }
89}
90
91impl ClientFactory<ConfigStage> {
92 #[inline]
94 pub fn builder() -> Self {
95 ClientFactory(ConfigStage(()))
96 }
97
98 #[inline]
100 pub fn config(
101 self,
102 config: Refreshable<ServicesConfig, Error>,
103 ) -> ClientFactory<UserAgentStage> {
104 ClientFactory(UserAgentStage {
105 config: Arc::new(config),
106 })
107 }
108}
109
110impl ClientFactory<UserAgentStage> {
111 #[inline]
113 pub fn user_agent(self, user_agent: UserAgent) -> ClientFactory {
114 ClientFactory(Complete {
115 config: self.0.config,
116 user_agent,
117 client_qos: ClientQos::Enabled,
118 server_qos: ServerQos::AutomaticRetry,
119 service_error: ServiceError::WrapInNewError,
120 idempotency: Idempotency::ByMethod,
121 node_selection_strategy: NodeSelectionStrategy::PinUntilError,
122 cache_manager: CacheManager {
123 uncached_inner: UncachedConfig {
124 metrics: None,
125 host_metrics: None,
126 #[cfg(not(target_arch = "wasm32"))]
127 blocking_handle: None,
128 conjure_runtime: Arc::new(ConjureRuntime::new()),
129 },
130 cache: WeakCache::new(STATE_CACHE_CAPACITY),
131 },
132 })
133 }
134}
135
136impl ClientFactory {
137 #[inline]
139 pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
140 self.0.user_agent = user_agent;
141 self
142 }
143
144 #[inline]
146 pub fn get_user_agent(&self) -> &UserAgent {
147 &self.0.user_agent
148 }
149
150 #[inline]
154 pub fn client_qos(mut self, client_qos: ClientQos) -> Self {
155 self.0.client_qos = client_qos;
156 self
157 }
158
159 #[inline]
161 pub fn get_client_qos(&self) -> ClientQos {
162 self.0.client_qos
163 }
164
165 #[inline]
169 pub fn server_qos(mut self, server_qos: ServerQos) -> Self {
170 self.0.server_qos = server_qos;
171 self
172 }
173
174 #[inline]
176 pub fn get_server_qos(&self) -> ServerQos {
177 self.0.server_qos
178 }
179
180 #[inline]
184 pub fn service_error(mut self, service_error: ServiceError) -> Self {
185 self.0.service_error = service_error;
186 self
187 }
188
189 #[inline]
191 pub fn get_service_error(&self) -> ServiceError {
192 self.0.service_error
193 }
194
195 #[inline]
201 pub fn idempotency(mut self, idempotency: Idempotency) -> Self {
202 self.0.idempotency = idempotency;
203 self
204 }
205
206 #[inline]
208 pub fn get_idempotency(&self) -> Idempotency {
209 self.0.idempotency
210 }
211
212 #[inline]
216 pub fn node_selection_strategy(
217 mut self,
218 node_selection_strategy: NodeSelectionStrategy,
219 ) -> Self {
220 self.0.node_selection_strategy = node_selection_strategy;
221 self
222 }
223
224 #[inline]
226 pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy {
227 self.0.node_selection_strategy
228 }
229
230 #[inline]
234 pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
235 self.0.cache_manager.uncached_mut().metrics = Some(metrics);
236 self
237 }
238
239 #[inline]
241 pub fn get_metrics(&self) -> Option<&Arc<MetricRegistry>> {
242 self.0.cache_manager.uncached().metrics.as_ref()
243 }
244
245 #[inline]
249 pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
250 self.0.cache_manager.uncached_mut().host_metrics = Some(host_metrics);
251 self
252 }
253
254 #[inline]
256 pub fn get_host_metrics(&self) -> Option<&Arc<HostMetricsRegistry>> {
257 self.0.cache_manager.uncached().host_metrics.as_ref()
258 }
259
260 #[inline]
264 pub fn conjure_runtime(mut self, conjure_runtime: Arc<ConjureRuntime>) -> Self {
265 self.0.cache_manager.uncached_mut().conjure_runtime = conjure_runtime;
266 self
267 }
268
269 pub fn get_conjure_runtime(&self) -> &Arc<ConjureRuntime> {
271 &self.0.cache_manager.uncached().conjure_runtime
272 }
273
274 fn state_builder(&self, service: &str) -> StateBuilder {
275 StateBuilder {
276 service: service.to_string(),
277 user_agent: self.0.user_agent.clone(),
278 client_qos: self.0.client_qos,
279 server_qos: self.0.server_qos,
280 service_error: self.0.service_error,
281 idempotency: self.0.idempotency,
282 node_selection_strategy: self.0.node_selection_strategy,
283 cache_manager: self.0.cache_manager.clone(),
284 }
285 }
286
287 pub fn client<T>(&self, service: &str) -> Result<T, Error>
297 where
298 T: AsyncService<Client>,
299 {
300 self.client_inner(service)
301 .map(|c| T::new(c, &self.0.cache_manager.uncached().conjure_runtime))
302 }
303
304 fn client_inner(&self, service: &str) -> Result<Client, Error> {
305 let service_config = self.0.config.map({
306 let service = service.to_string();
307 move |c| c.merged_service(&service).unwrap_or_default()
308 });
309 let state_builder = self.state_builder(service);
310 Self::raw_client(state_builder, service_config, None)
311 }
312
313 fn raw_client<T>(
314 state_builder: T,
315 service_config: Refreshable<ServiceConfig, Error>,
316 override_host_index: Option<usize>,
317 ) -> Result<Client, Error>
318 where
319 T: Borrow<StateBuilder> + 'static + Sync + Send,
320 {
321 let state = state_builder
322 .borrow()
323 .build(&service_config.get(), override_host_index)?;
324 let state = Arc::new(ArcSwap::new(state));
325
326 let subscription = service_config.try_subscribe({
327 let state = state.clone();
328 move |config| {
329 let new_state = state_builder.borrow().build(config, override_host_index)?;
330 state.store(new_state);
331 Ok(())
332 }
333 })?;
334
335 Ok(Client::new(state, Some(subscription)))
336 }
337
338 pub fn per_host_clients<T>(
346 &self,
347 service: &str,
348 ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
349 where
350 T: AsyncService<Client> + 'static + Sync + Send,
351 {
352 self.per_host_clients_inner(service, T::new)
353 }
354
355 fn per_host_clients_inner<T>(
356 &self,
357 service: &str,
358 make_client: impl Fn(Client, &Arc<ConjureRuntime>) -> T + 'static + Sync + Send,
359 ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
360 where
361 T: 'static + Sync + Send,
362 {
363 let state_builder = Arc::new(self.state_builder(service));
364
365 self.0
366 .config
367 .map({
368 let service = service.to_string();
369 move |c| c.merged_service(&service).unwrap_or_default()
370 })
371 .try_map({
372 let mut client_handles = HashMap::<Host, ClientHandle>::new();
373 move |c| {
374 let mut new_client_handles = HashMap::new();
375 let mut clients = HashMap::new();
376 let mut errors = vec![];
377
378 for (i, uri) in c.uris().iter().enumerate() {
379 let host = Host { uri: uri.clone() };
380 let host_config = service_config::Builder::from(c.clone())
381 .uris([uri.clone()])
382 .build();
383
384 let client_handle = match client_handles.remove(&host) {
385 Some(mut client_handle) => {
386 if let Err(es) = client_handle.handle.refresh(host_config) {
387 errors.extend(es);
388 }
389 client_handle
390 }
391 None => {
392 let (host_config, handle) = Refreshable::new(host_config);
393 let client = match Self::raw_client(
394 state_builder.clone(),
395 host_config,
396 Some(i),
397 ) {
398 Ok(state) => state,
399 Err(e) => {
400 errors.push(e);
401 continue;
402 }
403 };
404 ClientHandle { client, handle }
405 }
406 };
407
408 clients.insert(
409 host.clone(),
410 make_client(
411 client_handle.client.clone(),
412 &state_builder.cache_manager.uncached().conjure_runtime,
413 ),
414 );
415 new_client_handles.insert(host, client_handle);
416 }
417
418 client_handles = new_client_handles;
419 match errors.pop() {
420 Some(e) => {
421 for e2 in errors {
422 warn!("error reloading per-host clients", error: e2);
423 }
424 Err(e)
425 }
426 None => Ok(PerHostClients { clients }),
427 }
428 }
429 })
430 }
431}
432
433#[cfg(not(target_arch = "wasm32"))]
434impl ClientFactory {
435 #[inline]
441 pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
442 self.0.cache_manager.uncached_mut().blocking_handle = Some(blocking_handle);
443 self
444 }
445
446 #[inline]
448 pub fn get_blocking_handle(&self) -> Option<&Handle> {
449 self.0.cache_manager.uncached().blocking_handle.as_ref()
450 }
451
452 pub fn blocking_client<T>(&self, service: &str) -> Result<T, Error>
462 where
463 T: Service<blocking::Client>,
464 {
465 self.blocking_client_inner(service)
466 .map(|c| T::new(c, &self.0.cache_manager.uncached().conjure_runtime))
467 }
468
469 fn blocking_client_inner(&self, service: &str) -> Result<blocking::Client, Error> {
470 self.client_inner(service).map(|client| blocking::Client {
471 client,
472 handle: self.0.cache_manager.uncached().blocking_handle.clone(),
473 })
474 }
475
476 pub fn blocking_per_host_clients<T>(
484 &self,
485 service: &str,
486 ) -> Result<Refreshable<PerHostClients<T>, Error>, Error>
487 where
488 T: Service<blocking::Client> + 'static + Sync + Send,
489 {
490 self.per_host_clients_inner(service, {
491 let handle = self.0.cache_manager.uncached().blocking_handle.clone();
492 move |client, runtime| {
493 T::new(
494 blocking::Client {
495 client,
496 handle: handle.clone(),
497 },
498 runtime,
499 )
500 }
501 })
502 }
503}
504
505struct ClientHandle {
506 client: Client,
507 handle: RefreshHandle<ServiceConfig, Error>,
508}
509
510struct StateBuilder {
511 service: String,
512 user_agent: UserAgent,
513 client_qos: ClientQos,
514 server_qos: ServerQos,
515 service_error: ServiceError,
516 idempotency: Idempotency,
517 node_selection_strategy: NodeSelectionStrategy,
518 cache_manager: CacheManager,
519}
520
521impl StateBuilder {
522 fn build(
523 &self,
524 config: &ServiceConfig,
525 override_host_index: Option<usize>,
526 ) -> Result<Arc<Cached<CachedConfig, ClientState>>, Error> {
527 let mut builder = Client::builder()
528 .service(&self.service)
529 .user_agent(self.user_agent.clone())
530 .from_config(config)
531 .client_qos(self.client_qos)
532 .server_qos(self.server_qos)
533 .service_error(self.service_error)
534 .idempotency(self.idempotency)
535 .node_selection_strategy(self.node_selection_strategy)
536 .conjure_runtime(self.cache_manager.uncached().conjure_runtime.clone());
537
538 if let Some(metrics) = self.cache_manager.uncached().metrics.clone() {
539 builder = builder.metrics(metrics);
540 }
541
542 if let Some(host_metrics) = self.cache_manager.uncached().host_metrics.clone() {
543 builder = builder.host_metrics(host_metrics);
544 }
545
546 if let Some(override_host_index) = override_host_index {
547 builder = builder.override_host_index(override_host_index);
548 }
549
550 self.cache_manager
551 .cache
552 .get(&builder, Builder::cached_config, ClientState::new)
553 }
554}