conjure_runtime/
client.rs1use crate::builder::CachedConfig;
15use crate::raw::{BuildRawClient, DefaultRawClient, RawBody, Service};
16use crate::service::gzip::{DecodedBody, GzipLayer};
17use crate::service::http_error::HttpErrorLayer;
18use crate::service::map_error::MapErrorLayer;
19use crate::service::metrics::MetricsLayer;
20use crate::service::node::{NodeMetricsLayer, NodeSelectorLayer, NodeUriLayer};
21use crate::service::proxy::{ProxyConfig, ProxyLayer};
22use crate::service::response_body::ResponseBodyLayer;
23use crate::service::retry::RetryLayer;
24use crate::service::root_span::RootSpanLayer;
25use crate::service::trace_propagation::TracePropagationLayer;
26use crate::service::user_agent::UserAgentLayer;
27use crate::service::wait_for_spans::{WaitForSpansBody, WaitForSpansLayer};
28use crate::service::{Identity, Layer, ServiceBuilder, Stack};
29use crate::weak_cache::Cached;
30use crate::{builder, BodyWriter, Builder, ResponseBody};
31use arc_swap::ArcSwap;
32use bytes::Bytes;
33use conjure_error::Error;
34use conjure_http::client::{AsyncClient, AsyncRequestBody, AsyncService};
35use conjure_runtime_config::ServiceConfig;
36use http::{Request, Response};
37use refreshable::Subscription;
38use std::error;
39use std::sync::Arc;
40
41macro_rules! layers {
42 () => { Identity };
43 ($layer:ty, $($rem:tt)*) => { Stack<$layer, layers!($($rem)*)> };
44}
45
46type BaseLayer = layers!(
47 ResponseBodyLayer,
48 MetricsLayer,
49 RootSpanLayer,
50 RetryLayer,
51 HttpErrorLayer,
52 WaitForSpansLayer,
53 NodeSelectorLayer,
54 NodeUriLayer,
55 NodeMetricsLayer,
56 ProxyLayer,
57 TracePropagationLayer,
58 UserAgentLayer,
59 GzipLayer,
60 MapErrorLayer,
61);
62
63type BaseService<T> = <BaseLayer as Layer<T>>::Service;
64
65pub(crate) type BaseBody<B> = WaitForSpansBody<DecodedBody<B>>;
66
67pub(crate) struct ClientState<T> {
68 service: BaseService<T>,
69}
70
71impl<T> ClientState<T> {
72 pub(crate) fn new<U>(builder: &Builder<builder::Complete<U>>) -> Result<ClientState<T>, Error>
73 where
74 U: BuildRawClient<RawClient = T>,
75 {
76 let client = builder.get_raw_client_builder().build_raw_client(builder)?;
77
78 let proxy = ProxyConfig::from_config(builder.get_proxy())?;
79
80 let service = ServiceBuilder::new()
81 .layer(ResponseBodyLayer)
82 .layer(MetricsLayer::new(builder))
83 .layer(RootSpanLayer)
84 .layer(RetryLayer::new(builder))
85 .layer(HttpErrorLayer::new(builder))
86 .layer(WaitForSpansLayer)
87 .layer(NodeSelectorLayer::new(builder)?)
88 .layer(NodeUriLayer)
89 .layer(NodeMetricsLayer)
90 .layer(ProxyLayer::new(&proxy))
91 .layer(TracePropagationLayer)
92 .layer(UserAgentLayer::new(builder))
93 .layer(GzipLayer)
94 .layer(MapErrorLayer)
95 .service(client);
96
97 Ok(ClientState { service })
98 }
99}
100
101pub struct Client<T = DefaultRawClient> {
106 state: Arc<ArcSwap<Cached<CachedConfig, ClientState<T>>>>,
107 subscription: Option<Arc<Subscription<ServiceConfig, Error>>>,
108}
109
110impl<T> Clone for Client<T> {
111 fn clone(&self) -> Self {
112 Client {
113 state: self.state.clone(),
114 subscription: self.subscription.clone(),
115 }
116 }
117}
118
119impl Client {
120 #[inline]
122 pub fn builder() -> Builder<builder::ServiceStage> {
123 Builder::new()
124 }
125}
126
127impl<T> Client<T> {
128 pub(crate) fn new(
129 state: Arc<ArcSwap<Cached<CachedConfig, ClientState<T>>>>,
130 subscription: Option<Subscription<ServiceConfig, Error>>,
131 ) -> Client<T> {
132 Client {
133 state,
134 subscription: subscription.map(Arc::new),
135 }
136 }
137}
138
139impl<T> AsyncService<Client<T>> for Client<T> {
140 fn new(client: Client<T>) -> Self {
141 client
142 }
143}
144
145impl<T, B> AsyncClient for Client<T>
146where
147 T: Service<http::Request<RawBody>, Response = http::Response<B>> + 'static + Sync + Send,
148 T::Error: Into<Box<dyn error::Error + Sync + Send>>,
149 B: http_body::Body<Data = Bytes> + 'static + Send,
150 B::Error: Into<Box<dyn error::Error + Sync + Send>>,
151{
152 type BodyWriter = BodyWriter;
153
154 type ResponseBody = ResponseBody<B>;
155
156 async fn send(
157 &self,
158 request: Request<AsyncRequestBody<'_, Self::BodyWriter>>,
159 ) -> Result<Response<Self::ResponseBody>, Error> {
160 self.state.load().service.call(request).await
161 }
162}