etcd/client.rs
1//! Contains the etcd client. All API calls are made via the client.
2
3use futures::stream::futures_unordered;
4use futures::{Future, IntoFuture, Stream};
5use http::header::{HeaderMap, HeaderValue};
6use hyper::client::connect::{Connect, HttpConnector};
7use hyper::{Client as Hyper, StatusCode, Uri};
8#[cfg(feature = "tls")]
9use hyper_tls::HttpsConnector;
10use log::error;
11use serde::de::DeserializeOwned;
12use serde_derive::{Deserialize, Serialize};
13use serde_json;
14
15use crate::error::{ApiError, Error};
16use crate::http::HttpClient;
17use crate::version::VersionInfo;
18
19// header! {
20// /// The `X-Etcd-Cluster-Id` header.
21// (XEtcdClusterId, "X-Etcd-Cluster-Id") => [String]
22// }
23const XETCD_CLUSTER_ID: &str = "X-Etcd-Cluster-Id";
24
25// header! {
26// /// The `X-Etcd-Index` HTTP header.
27// (XEtcdIndex, "X-Etcd-Index") => [u64]
28// }
29const XETCD_INDEX: &str = "X-Etcd-Index";
30
31// header! {
32// /// The `X-Raft-Index` HTTP header.
33// (XRaftIndex, "X-Raft-Index") => [u64]
34// }
35const XRAFT_INDEX: &str = "X-Raft-Index";
36
37// header! {
38// /// The `X-Raft-Term` HTTP header.
39// (XRaftTerm, "X-Raft-Term") => [u64]
40// }
41const XRAFT_TERM: &str = "X-Raft-Term";
42
43/// API client for etcd.
44///
45/// All API calls require a client.
46#[derive(Clone, Debug)]
47pub struct Client<C>
48where
49 C: Clone + Connect + Sync + 'static,
50{
51 endpoints: Vec<Uri>,
52 http_client: HttpClient<C>,
53}
54
55/// A username and password to use for HTTP basic authentication.
56#[derive(Clone, Debug, Eq, Hash, PartialEq)]
57pub struct BasicAuth {
58 /// The username to use for authentication.
59 pub username: String,
60 /// The password to use for authentication.
61 pub password: String,
62}
63
64/// A value returned by the health check API endpoint to indicate a healthy cluster member.
65#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
66pub struct Health {
67 /// The health status of the cluster member.
68 pub health: String,
69}
70
71impl Client<HttpConnector> {
72 /// Constructs a new client using the HTTP protocol.
73 ///
74 /// # Parameters
75 ///
76 /// * handle: A handle to the event loop.
77 /// * endpoints: URLs for one or more cluster members. When making an API call, the client will
78 /// make the call to each member in order until it receives a successful respponse.
79 /// * basic_auth: Credentials for HTTP basic authentication.
80 ///
81 /// # Errors
82 ///
83 /// Fails if no endpoints are provided or if any of the endpoints is an invalid URL.
84 pub fn new(
85 endpoints: &[&str],
86 basic_auth: Option<BasicAuth>,
87 ) -> Result<Client<HttpConnector>, Error> {
88 let hyper = Hyper::builder().keep_alive(true).build_http();
89
90 Client::custom(hyper, endpoints, basic_auth)
91 }
92}
93
94#[cfg(feature = "tls")]
95impl Client<HttpsConnector<HttpConnector>> {
96 /// Constructs a new client using the HTTPS protocol.
97 ///
98 /// # Parameters
99 ///
100 /// * handle: A handle to the event loop.
101 /// * endpoints: URLs for one or more cluster members. When making an API call, the client will
102 /// make the call to each member in order until it receives a successful respponse.
103 /// * basic_auth: Credentials for HTTP basic authentication.
104 ///
105 /// # Errors
106 ///
107 /// Fails if no endpoints are provided or if any of the endpoints is an invalid URL.
108 pub fn https(
109 endpoints: &[&str],
110 basic_auth: Option<BasicAuth>,
111 ) -> Result<Client<HttpsConnector<HttpConnector>>, Error> {
112 let connector = HttpsConnector::new(4)?;
113 let hyper = Hyper::builder().keep_alive(true).build(connector);
114
115 Client::custom(hyper, endpoints, basic_auth)
116 }
117}
118
119impl<C> Client<C>
120where
121 C: Clone + Connect + Sync + 'static,
122{
123 /// Constructs a new client using the provided `hyper::Client`.
124 ///
125 /// This method allows the user to configure the details of the underlying HTTP client to their
126 /// liking. It is also necessary when using X.509 client certificate authentication.
127 ///
128 /// # Parameters
129 ///
130 /// * hyper: A fully configured `hyper::Client`.
131 /// * endpoints: URLs for one or more cluster members. When making an API call, the client will
132 /// make the call to each member in order until it receives a successful respponse.
133 /// * basic_auth: Credentials for HTTP basic authentication.
134 ///
135 /// # Errors
136 ///
137 /// Fails if no endpoints are provided or if any of the endpoints is an invalid URL.
138 ///
139 /// # Examples
140 ///
141 /// Configuring the client to authenticate with both HTTP basic auth and an X.509 client
142 /// certificate:
143 ///
144 /// ```no_run
145 /// use std::fs::File;
146 /// use std::io::Read;
147 ///
148 /// use futures::Future;
149 /// use hyper::client::HttpConnector;
150 /// use hyper_tls::HttpsConnector;
151 /// use native_tls::{Certificate, TlsConnector, Identity};
152 /// use tokio::runtime::Runtime;
153 ///
154 /// use etcd::{Client, kv};
155 ///
156 /// fn main() {
157 /// let mut ca_cert_file = File::open("ca.der").unwrap();
158 /// let mut ca_cert_buffer = Vec::new();
159 /// ca_cert_file.read_to_end(&mut ca_cert_buffer).unwrap();
160 ///
161 /// let mut pkcs12_file = File::open("/source/tests/ssl/client.p12").unwrap();
162 /// let mut pkcs12_buffer = Vec::new();
163 /// pkcs12_file.read_to_end(&mut pkcs12_buffer).unwrap();
164 ///
165 /// let mut builder = TlsConnector::builder();
166 /// builder.add_root_certificate(Certificate::from_der(&ca_cert_buffer).unwrap());
167 /// builder.identity(Identity::from_pkcs12(&pkcs12_buffer, "secret").unwrap());
168 ///
169 /// let tls_connector = builder.build().unwrap();
170 ///
171 /// let mut http_connector = HttpConnector::new(4);
172 /// http_connector.enforce_http(false);
173 /// let https_connector = HttpsConnector::from((http_connector, tls_connector));
174 ///
175 /// let hyper = hyper::Client::builder().build(https_connector);
176 ///
177 /// let client = Client::custom(hyper, &["https://etcd.example.com:2379"], None).unwrap();
178 ///
179 /// let work = kv::set(&client, "/foo", "bar", None).and_then(move |_| {
180 /// let get_request = kv::get(&client, "/foo", kv::GetOptions::default());
181 ///
182 /// get_request.and_then(|response| {
183 /// let value = response.data.node.value.unwrap();
184 ///
185 /// assert_eq!(value, "bar".to_string());
186 ///
187 /// Ok(())
188 /// })
189 /// });
190 ///
191 /// assert!(Runtime::new().unwrap().block_on(work).is_ok());
192 /// }
193 /// ```
194 pub fn custom(
195 hyper: Hyper<C>,
196 endpoints: &[&str],
197 basic_auth: Option<BasicAuth>,
198 ) -> Result<Client<C>, Error> {
199 if endpoints.len() < 1 {
200 return Err(Error::NoEndpoints);
201 }
202
203 let mut uri_endpoints = Vec::with_capacity(endpoints.len());
204
205 for endpoint in endpoints {
206 uri_endpoints.push(endpoint.parse()?);
207 }
208
209 Ok(Client {
210 endpoints: uri_endpoints,
211 http_client: HttpClient::new(hyper, basic_auth),
212 })
213 }
214
215 /// Lets other internal code access the `HttpClient`.
216 pub(crate) fn http_client(&self) -> &HttpClient<C> {
217 &self.http_client
218 }
219
220 /// Lets other internal code access the cluster endpoints.
221 pub(crate) fn endpoints(&self) -> &[Uri] {
222 &self.endpoints
223 }
224
225 /// Runs a basic health check against each etcd member.
226 pub fn health(&self) -> impl Stream<Item = Response<Health>, Error = Error> + Send {
227 let futures = self.endpoints.iter().map(|endpoint| {
228 let url = build_url(&endpoint, "health");
229 let uri = url.parse().map_err(Error::from).into_future();
230 let cloned_client = self.http_client.clone();
231 let response = uri.and_then(move |uri| cloned_client.get(uri).map_err(Error::from));
232 response.and_then(|response| {
233 let status = response.status();
234 let cluster_info = ClusterInfo::from(response.headers());
235 let body = response.into_body().concat2().map_err(Error::from);
236
237 body.and_then(move |ref body| {
238 if status == StatusCode::OK {
239 match serde_json::from_slice::<Health>(body) {
240 Ok(data) => Ok(Response { data, cluster_info }),
241 Err(error) => Err(Error::Serialization(error)),
242 }
243 } else {
244 match serde_json::from_slice::<ApiError>(body) {
245 Ok(error) => Err(Error::Api(error)),
246 Err(error) => Err(Error::Serialization(error)),
247 }
248 }
249 })
250 })
251 });
252
253 futures_unordered(futures)
254 }
255
256 /// Returns version information from each etcd cluster member the client was initialized with.
257 pub fn versions(&self) -> impl Stream<Item = Response<VersionInfo>, Error = Error> + Send {
258 let futures = self.endpoints.iter().map(|endpoint| {
259 let url = build_url(&endpoint, "version");
260 let uri = url.parse().map_err(Error::from).into_future();
261 let cloned_client = self.http_client.clone();
262 let response = uri.and_then(move |uri| cloned_client.get(uri).map_err(Error::from));
263 response.and_then(|response| {
264 let status = response.status();
265 let cluster_info = ClusterInfo::from(response.headers());
266 let body = response.into_body().concat2().map_err(Error::from);
267
268 body.and_then(move |ref body| {
269 if status == StatusCode::OK {
270 match serde_json::from_slice::<VersionInfo>(body) {
271 Ok(data) => Ok(Response { data, cluster_info }),
272 Err(error) => Err(Error::Serialization(error)),
273 }
274 } else {
275 match serde_json::from_slice::<ApiError>(body) {
276 Ok(error) => Err(Error::Api(error)),
277 Err(error) => Err(Error::Serialization(error)),
278 }
279 }
280 })
281 })
282 });
283
284 futures_unordered(futures)
285 }
286
287 /// Lets other internal code make basic HTTP requests.
288 pub(crate) fn request<U, T>(
289 &self,
290 uri: U,
291 ) -> impl Future<Item = Response<T>, Error = Error> + Send
292 where
293 U: Future<Item = Uri, Error = Error> + Send,
294 T: DeserializeOwned + Send + 'static,
295 {
296 let http_client = self.http_client.clone();
297 let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
298 response.and_then(|response| {
299 let status = response.status();
300 let cluster_info = ClusterInfo::from(response.headers());
301 let body = response.into_body().concat2().map_err(Error::from);
302
303 body.and_then(move |body| {
304 if status == StatusCode::OK {
305 match serde_json::from_slice::<T>(&body) {
306 Ok(data) => Ok(Response { data, cluster_info }),
307 Err(error) => Err(Error::Serialization(error)),
308 }
309 } else {
310 match serde_json::from_slice::<ApiError>(&body) {
311 Ok(error) => Err(Error::Api(error)),
312 Err(error) => Err(Error::Serialization(error)),
313 }
314 }
315 })
316 })
317 }
318}
319
320/// A wrapper type returned by all API calls.
321///
322/// Contains the primary data of the response along with information about the cluster extracted
323/// from the HTTP response headers.
324#[derive(Clone, Debug)]
325pub struct Response<T> {
326 /// Information about the state of the cluster.
327 pub cluster_info: ClusterInfo,
328 /// The primary data of the response.
329 pub data: T,
330}
331
332/// Information about the state of the etcd cluster from an API response's HTTP headers.
333#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
334pub struct ClusterInfo {
335 /// An internal identifier for the cluster.
336 pub cluster_id: Option<String>,
337 /// A unique, monotonically-incrementing integer created for each change to etcd.
338 pub etcd_index: Option<u64>,
339 /// A unique, monotonically-incrementing integer used by the Raft protocol.
340 pub raft_index: Option<u64>,
341 /// The current Raft election term.
342 pub raft_term: Option<u64>,
343}
344
345impl<'a> From<&'a HeaderMap<HeaderValue>> for ClusterInfo {
346 fn from(headers: &'a HeaderMap<HeaderValue>) -> Self {
347 let cluster_id = headers.get(XETCD_CLUSTER_ID).and_then(|v| {
348 match String::from_utf8(v.as_bytes().to_vec()) {
349 Ok(s) => Some(s),
350 Err(e) => {
351 error!("{} header decode error: {:?}", XETCD_CLUSTER_ID, e);
352 None
353 }
354 }
355 });
356
357 let etcd_index = headers.get(XETCD_INDEX).and_then(|v| {
358 match String::from_utf8(v.as_bytes().to_vec())
359 .map_err(|e| format!("{:?}", e))
360 .and_then(|s| s.parse().map_err(|e| format!("{:?}", e)))
361 {
362 Ok(i) => Some(i),
363 Err(e) => {
364 error!("{} header decode error: {}", XETCD_INDEX, e);
365 None
366 }
367 }
368 });
369
370 let raft_index = headers.get(XRAFT_INDEX).and_then(|v| {
371 match String::from_utf8(v.as_bytes().to_vec())
372 .map_err(|e| format!("{:?}", e))
373 .and_then(|s| s.parse().map_err(|e| format!("{:?}", e)))
374 {
375 Ok(i) => Some(i),
376 Err(e) => {
377 error!("{} header decode error: {}", XRAFT_INDEX, e);
378 None
379 }
380 }
381 });
382
383 let raft_term = headers.get(XRAFT_TERM).and_then(|v| {
384 match String::from_utf8(v.as_bytes().to_vec())
385 .map_err(|e| format!("{:?}", e))
386 .and_then(|s| s.parse().map_err(|e| format!("{:?}", e)))
387 {
388 Ok(i) => Some(i),
389 Err(e) => {
390 error!("{} header decode error: {}", XRAFT_TERM, e);
391 None
392 }
393 }
394 });
395
396 ClusterInfo {
397 cluster_id: cluster_id,
398 etcd_index: etcd_index,
399 raft_index: raft_index,
400 raft_term: raft_term,
401 }
402 }
403}
404
405/// Constructs the full URL for the versions API call.
406fn build_url(endpoint: &Uri, path: &str) -> String {
407 format!("{}{}", endpoint, path)
408}