conjure_runtime/
client.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use crate::builder::CachedConfig;
15use crate::raw::{BuildRawClient, DefaultRawClient, RawBody, Service};
16use crate::service::gzip::{DecodedBody, GzipLayer};
17use crate::service::http_error::HttpErrorLayer;
18use crate::service::map_error::MapErrorLayer;
19use crate::service::metrics::MetricsLayer;
20use crate::service::node::{NodeMetricsLayer, NodeSelectorLayer, NodeUriLayer};
21use crate::service::proxy::{ProxyConfig, ProxyLayer};
22use crate::service::response_body::ResponseBodyLayer;
23use crate::service::retry::RetryLayer;
24use crate::service::root_span::RootSpanLayer;
25use crate::service::trace_propagation::TracePropagationLayer;
26use crate::service::user_agent::UserAgentLayer;
27use crate::service::wait_for_spans::{WaitForSpansBody, WaitForSpansLayer};
28use crate::service::{Identity, Layer, ServiceBuilder, Stack};
29use crate::weak_cache::Cached;
30use crate::{builder, BodyWriter, Builder, ResponseBody};
31use arc_swap::ArcSwap;
32use bytes::Bytes;
33use conjure_error::Error;
34use conjure_http::client::{AsyncClient, AsyncRequestBody, AsyncService};
35use conjure_runtime_config::ServiceConfig;
36use http::{Request, Response};
37use refreshable::Subscription;
38use std::error;
39use std::sync::Arc;
40
41macro_rules! layers {
42    () => { Identity };
43    ($layer:ty, $($rem:tt)*) => { Stack<$layer, layers!($($rem)*)> };
44}
45
46type BaseLayer = layers!(
47    ResponseBodyLayer,
48    MetricsLayer,
49    RootSpanLayer,
50    RetryLayer,
51    HttpErrorLayer,
52    WaitForSpansLayer,
53    NodeSelectorLayer,
54    NodeUriLayer,
55    NodeMetricsLayer,
56    ProxyLayer,
57    TracePropagationLayer,
58    UserAgentLayer,
59    GzipLayer,
60    MapErrorLayer,
61);
62
63type BaseService<T> = <BaseLayer as Layer<T>>::Service;
64
65pub(crate) type BaseBody<B> = WaitForSpansBody<DecodedBody<B>>;
66
67pub(crate) struct ClientState<T> {
68    service: BaseService<T>,
69}
70
71impl<T> ClientState<T> {
72    pub(crate) fn new<U>(builder: &Builder<builder::Complete<U>>) -> Result<ClientState<T>, Error>
73    where
74        U: BuildRawClient<RawClient = T>,
75    {
76        let client = builder.get_raw_client_builder().build_raw_client(builder)?;
77
78        let proxy = ProxyConfig::from_config(builder.get_proxy())?;
79
80        let service = ServiceBuilder::new()
81            .layer(ResponseBodyLayer)
82            .layer(MetricsLayer::new(builder))
83            .layer(RootSpanLayer)
84            .layer(RetryLayer::new(builder))
85            .layer(HttpErrorLayer::new(builder))
86            .layer(WaitForSpansLayer)
87            .layer(NodeSelectorLayer::new(builder)?)
88            .layer(NodeUriLayer)
89            .layer(NodeMetricsLayer)
90            .layer(ProxyLayer::new(&proxy))
91            .layer(TracePropagationLayer)
92            .layer(UserAgentLayer::new(builder))
93            .layer(GzipLayer)
94            .layer(MapErrorLayer)
95            .service(client);
96
97        Ok(ClientState { service })
98    }
99}
100
101/// An asynchronous HTTP client to a remote service.
102///
103/// It implements the Conjure `AsyncClient` trait, but also offers a "raw" request interface for use with services that
104/// don't provide Conjure service definitions.
105pub struct Client<T = DefaultRawClient> {
106    state: Arc<ArcSwap<Cached<CachedConfig, ClientState<T>>>>,
107    subscription: Option<Arc<Subscription<ServiceConfig, Error>>>,
108}
109
110impl<T> Clone for Client<T> {
111    fn clone(&self) -> Self {
112        Client {
113            state: self.state.clone(),
114            subscription: self.subscription.clone(),
115        }
116    }
117}
118
119impl Client {
120    /// Creates a new `Builder` for clients.
121    #[inline]
122    pub fn builder() -> Builder<builder::ServiceStage> {
123        Builder::new()
124    }
125}
126
127impl<T> Client<T> {
128    pub(crate) fn new(
129        state: Arc<ArcSwap<Cached<CachedConfig, ClientState<T>>>>,
130        subscription: Option<Subscription<ServiceConfig, Error>>,
131    ) -> Client<T> {
132        Client {
133            state,
134            subscription: subscription.map(Arc::new),
135        }
136    }
137}
138
139impl<T> AsyncService<Client<T>> for Client<T> {
140    fn new(client: Client<T>) -> Self {
141        client
142    }
143}
144
145impl<T, B> AsyncClient for Client<T>
146where
147    T: Service<http::Request<RawBody>, Response = http::Response<B>> + 'static + Sync + Send,
148    T::Error: Into<Box<dyn error::Error + Sync + Send>>,
149    B: http_body::Body<Data = Bytes> + 'static + Send,
150    B::Error: Into<Box<dyn error::Error + Sync + Send>>,
151{
152    type BodyWriter = BodyWriter;
153
154    type ResponseBody = ResponseBody<B>;
155
156    async fn send(
157        &self,
158        request: Request<AsyncRequestBody<'_, Self::BodyWriter>>,
159    ) -> Result<Response<Self::ResponseBody>, Error> {
160        self.state.load().service.call(request).await
161    }
162}