1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// Copyright 2020 Palantir Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::node_selector::NodeSelector;
use crate::proxy::{ProxyConfig, ProxyConnector};
use crate::{
    send, Agent, HostMetricsRegistry, HyperBody, Request, RequestBuilder, Response, UserAgent,
};
use arc_swap::ArcSwap;
use conjure_error::Error;
use conjure_runtime_config::ServiceConfig;
use hyper::client::HttpConnector;
use hyper::header::HeaderValue;
use hyper::Method;
use hyper_openssl::HttpsConnector;
use openssl::ssl::{SslConnector, SslMethod};
use std::sync::{Arc, Weak};
use std::time::Duration;
use witchcraft_log::info;
use witchcraft_metrics::{Meter, MetricId, MetricRegistry, Timer};

// This is pretty arbitrary - I just grabbed it from some Cloudflare blog post.
const TCP_KEEPALIVE: Duration = Duration::from_secs(3 * 60);

pub(crate) struct ClientState {
    pub(crate) client: hyper::Client<HttpsConnector<ProxyConnector<HttpConnector>>, HyperBody>,
    pub(crate) node_selector: NodeSelector,
    pub(crate) max_num_retries: u32,
    pub(crate) backoff_slot_size: Duration,
    pub(crate) request_timeout: Duration,
    pub(crate) proxy: ProxyConfig,
}

impl ClientState {
    fn from_config(
        service: &str,
        host_metrics: &HostMetricsRegistry,
        service_config: &ServiceConfig,
    ) -> Result<ClientState, Error> {
        let mut connector = HttpConnector::new();
        connector.enforce_http(false);
        connector.set_nodelay(true);
        connector.set_keepalive(Some(TCP_KEEPALIVE));
        connector.set_connect_timeout(Some(service_config.connect_timeout()));

        let proxy = ProxyConfig::from_config(service_config.proxy())?;
        let connector = ProxyConnector::new(connector, &proxy);

        let mut ssl = SslConnector::builder(SslMethod::tls()).map_err(Error::internal_safe)?;
        ssl.set_alpn_protos(b"\x02h2\x08http/1.1")
            .map_err(Error::internal_safe)?;

        if let Some(ca_file) = service_config.security().ca_file() {
            ssl.set_ca_file(ca_file).map_err(Error::internal_safe)?;
        }

        let connector =
            HttpsConnector::with_connector(connector, ssl).map_err(Error::internal_safe)?;

        let client = hyper::Client::builder().build(connector);

        let node_selector = NodeSelector::new(service, host_metrics, service_config);

        Ok(ClientState {
            client,
            node_selector,
            max_num_retries: service_config.max_num_retries(),
            backoff_slot_size: service_config.backoff_slot_size(),
            request_timeout: service_config.request_timeout(),
            proxy,
        })
    }
}

pub(crate) struct SharedClient {
    pub(crate) service: String,
    pub(crate) user_agent: HeaderValue,
    pub(crate) state: ArcSwap<ClientState>,
    pub(crate) host_metrics: Arc<HostMetricsRegistry>,
    pub(crate) response_timer: Arc<Timer>,
    pub(crate) error_meter: Arc<Meter>,
}

/// An asynchronous HTTP client to a remote service.
///
/// It implements the Conjure `AsyncClient` trait, but also offers a "raw" request interface for use with services that
/// don't provide Conjure service definitions.
#[derive(Clone)]
pub struct Client {
    pub(crate) shared: Arc<SharedClient>,
    assume_idempotent: bool,
    propagate_qos_errors: bool,
    propagate_service_errors: bool,
}

impl Client {
    /// Creates a new client.
    ///
    /// The user agent is extended with an agent identifying the name and version of this crate.
    pub fn new(
        service: &str,
        mut user_agent: UserAgent,
        host_metrics: &Arc<HostMetricsRegistry>,
        metrics: &Arc<MetricRegistry>,
        config: &ServiceConfig,
    ) -> Result<Client, Error> {
        user_agent.push_agent(Agent::new("conjure-runtime", env!("CARGO_PKG_VERSION")));

        let state = ClientState::from_config(service, host_metrics, config)?;

        let response_timer = metrics
            .timer(MetricId::new("client.response").with_tag("service-name", service.to_string()));
        let error_meter = metrics.meter(
            MetricId::new("client.response.error")
                .with_tag("service-name", service.to_string())
                .with_tag("reason", "IOException"),
        );

        Ok(Client {
            shared: Arc::new(SharedClient {
                service: service.to_string(),
                user_agent: HeaderValue::from_str(&user_agent.to_string()).unwrap(),
                state: ArcSwap::new(Arc::new(state)),
                host_metrics: host_metrics.clone(),
                response_timer,
                error_meter,
            }),
            assume_idempotent: false,
            propagate_qos_errors: false,
            propagate_service_errors: false,
        })
    }

    /// Configures the client to assume that all requests are idempotent.
    ///
    /// Idempotent operations can be rerun without changing the result of the operation, which allows the client to
    /// safely retry failed requests. By default, GET, HEAD, PUT, and DELETE requests are assumed to be idempotent, but
    /// this method can be used to override that behavior.
    pub fn set_assume_idempotent(&mut self, assume_idempotent: bool) {
        self.assume_idempotent = assume_idempotent;
    }

    /// Returns true if the client is configured to assume all requests are idempotent.
    pub fn assume_idempotent(&self) -> bool {
        self.assume_idempotent
    }

    /// Configures transparent propagation of QoS errors (i.e. 429 and 503 responses).
    ///
    /// By default, the client will automatically retry in response to QoS errors, but if this option is enabled it will
    /// instead immediately return an error which will cause the same response. This is designed for contexts where one
    /// service is proxying a request to another and the developer wants to avoid nested retry loops.
    pub fn set_propagate_qos_errors(&mut self, propagate_qos_errors: bool) {
        self.propagate_qos_errors = propagate_qos_errors;
    }

    /// Returns true if the client will propagate QoS errors.
    pub fn propagate_qos_errors(&self) -> bool {
        self.propagate_qos_errors
    }

    /// Configures transparent propagation of service errors.
    ///
    /// By default, the client will turn service errors returned by the remote server into an internal server error, but
    /// if this option is enabled it will instead return the same service error it received. This is designed for
    /// contexts where one service is proxying a request to another and the developer wants the upstream client to see
    /// downstream errors.
    pub fn set_propagate_service_errors(&mut self, propagate_service_errors: bool) {
        self.propagate_service_errors = propagate_service_errors;
    }

    /// Returns true if the client will propagate service errors.
    pub fn propagate_service_errors(&self) -> bool {
        self.propagate_service_errors
    }

    /// Returns a new request builder.
    ///
    /// The `pattern` argument is a template for the request path. The `param` method on the builder is used to fill
    /// in the parameters in the pattern with dynamic values.
    pub fn request(&self, method: Method, pattern: &'static str) -> RequestBuilder<'_> {
        RequestBuilder::new(self, method, pattern)
    }

    /// Returns a new builder for a GET request.
    pub fn get(&self, pattern: &'static str) -> RequestBuilder<'_> {
        self.request(Method::GET, pattern)
    }

    /// Returns a new builder for a POST request.
    pub fn post(&self, pattern: &'static str) -> RequestBuilder<'_> {
        self.request(Method::POST, pattern)
    }

    /// Returns a new builder for a PUT request.
    pub fn put(&self, pattern: &'static str) -> RequestBuilder<'_> {
        self.request(Method::PUT, pattern)
    }

    /// Returns a new builder for a DELETE request.
    pub fn delete(&self, pattern: &'static str) -> RequestBuilder<'_> {
        self.request(Method::DELETE, pattern)
    }

    /// Returns a new builder for a PATCH request.
    pub fn patch(&self, pattern: &'static str) -> RequestBuilder<'_> {
        self.request(Method::PATCH, pattern)
    }

    /// Returns a new handle which can be used to dynamically refresh the client's configuration.
    pub fn refresh_handle(&self) -> RefreshHandle {
        RefreshHandle(Arc::downgrade(&self.shared))
    }

    pub(crate) async fn send(&self, request: Request<'_>) -> Result<Response, Error> {
        send::send(self, request).await
    }
}

/// A handle used to update the configuration of a `Client`.
pub struct RefreshHandle(Weak<SharedClient>);

impl RefreshHandle {
    /// Refreshes the client's configuration with a new one.
    ///
    /// If the client has already dropped, this is a no-op.
    pub fn refresh(&self, config: &ServiceConfig) -> Result<(), Error> {
        let client = match self.0.upgrade() {
            Some(client) => client,
            None => return Ok(()),
        };

        let state = ClientState::from_config(&client.service, &client.host_metrics, config)?;
        client.state.store(Arc::new(state));
        info!("reloaded client", safe: { service: client.service });

        Ok(())
    }

    /// Returns `true` if the client associated with the handle has dropped.
    pub fn has_dropped(&self) -> bool {
        // FIXME use strong_count when it stabilizes
        self.0.upgrade().is_none()
    }
}