etcd/
client.rs

1//! Contains the etcd client. All API calls are made via the client.
2
3use futures::stream::futures_unordered;
4use futures::{Future, IntoFuture, Stream};
5use http::header::{HeaderMap, HeaderValue};
6use hyper::client::connect::{Connect, HttpConnector};
7use hyper::{Client as Hyper, StatusCode, Uri};
8#[cfg(feature = "tls")]
9use hyper_tls::HttpsConnector;
10use log::error;
11use serde::de::DeserializeOwned;
12use serde_derive::{Deserialize, Serialize};
13use serde_json;
14
15use crate::error::{ApiError, Error};
16use crate::http::HttpClient;
17use crate::version::VersionInfo;
18
19// header! {
20//     /// The `X-Etcd-Cluster-Id` header.
21//     (XEtcdClusterId, "X-Etcd-Cluster-Id") => [String]
22// }
23const XETCD_CLUSTER_ID: &str = "X-Etcd-Cluster-Id";
24
25// header! {
26//     /// The `X-Etcd-Index` HTTP header.
27//     (XEtcdIndex, "X-Etcd-Index") => [u64]
28// }
29const XETCD_INDEX: &str = "X-Etcd-Index";
30
31// header! {
32//     /// The `X-Raft-Index` HTTP header.
33//     (XRaftIndex, "X-Raft-Index") => [u64]
34// }
35const XRAFT_INDEX: &str = "X-Raft-Index";
36
37// header! {
38//     /// The `X-Raft-Term` HTTP header.
39//     (XRaftTerm, "X-Raft-Term") => [u64]
40// }
41const XRAFT_TERM: &str = "X-Raft-Term";
42
43/// API client for etcd.
44///
45/// All API calls require a client.
46#[derive(Clone, Debug)]
47pub struct Client<C>
48where
49    C: Clone + Connect + Sync + 'static,
50{
51    endpoints: Vec<Uri>,
52    http_client: HttpClient<C>,
53}
54
55/// A username and password to use for HTTP basic authentication.
56#[derive(Clone, Debug, Eq, Hash, PartialEq)]
57pub struct BasicAuth {
58    /// The username to use for authentication.
59    pub username: String,
60    /// The password to use for authentication.
61    pub password: String,
62}
63
64/// A value returned by the health check API endpoint to indicate a healthy cluster member.
65#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
66pub struct Health {
67    /// The health status of the cluster member.
68    pub health: String,
69}
70
71impl Client<HttpConnector> {
72    /// Constructs a new client using the HTTP protocol.
73    ///
74    /// # Parameters
75    ///
76    /// * handle: A handle to the event loop.
77    /// * endpoints: URLs for one or more cluster members. When making an API call, the client will
78    /// make the call to each member in order until it receives a successful respponse.
79    /// * basic_auth: Credentials for HTTP basic authentication.
80    ///
81    /// # Errors
82    ///
83    /// Fails if no endpoints are provided or if any of the endpoints is an invalid URL.
84    pub fn new(
85        endpoints: &[&str],
86        basic_auth: Option<BasicAuth>,
87    ) -> Result<Client<HttpConnector>, Error> {
88        let hyper = Hyper::builder().keep_alive(true).build_http();
89
90        Client::custom(hyper, endpoints, basic_auth)
91    }
92}
93
94#[cfg(feature = "tls")]
95impl Client<HttpsConnector<HttpConnector>> {
96    /// Constructs a new client using the HTTPS protocol.
97    ///
98    /// # Parameters
99    ///
100    /// * handle: A handle to the event loop.
101    /// * endpoints: URLs for one or more cluster members. When making an API call, the client will
102    /// make the call to each member in order until it receives a successful respponse.
103    /// * basic_auth: Credentials for HTTP basic authentication.
104    ///
105    /// # Errors
106    ///
107    /// Fails if no endpoints are provided or if any of the endpoints is an invalid URL.
108    pub fn https(
109        endpoints: &[&str],
110        basic_auth: Option<BasicAuth>,
111    ) -> Result<Client<HttpsConnector<HttpConnector>>, Error> {
112        let connector = HttpsConnector::new(4)?;
113        let hyper = Hyper::builder().keep_alive(true).build(connector);
114
115        Client::custom(hyper, endpoints, basic_auth)
116    }
117}
118
119impl<C> Client<C>
120where
121    C: Clone + Connect + Sync + 'static,
122{
123    /// Constructs a new client using the provided `hyper::Client`.
124    ///
125    /// This method allows the user to configure the details of the underlying HTTP client to their
126    /// liking. It is also necessary when using X.509 client certificate authentication.
127    ///
128    /// # Parameters
129    ///
130    /// * hyper: A fully configured `hyper::Client`.
131    /// * endpoints: URLs for one or more cluster members. When making an API call, the client will
132    /// make the call to each member in order until it receives a successful respponse.
133    /// * basic_auth: Credentials for HTTP basic authentication.
134    ///
135    /// # Errors
136    ///
137    /// Fails if no endpoints are provided or if any of the endpoints is an invalid URL.
138    ///
139    /// # Examples
140    ///
141    /// Configuring the client to authenticate with both HTTP basic auth and an X.509 client
142    /// certificate:
143    ///
144    /// ```no_run
145    /// use std::fs::File;
146    /// use std::io::Read;
147    ///
148    /// use futures::Future;
149    /// use hyper::client::HttpConnector;
150    /// use hyper_tls::HttpsConnector;
151    /// use native_tls::{Certificate, TlsConnector, Identity};
152    /// use tokio::runtime::Runtime;
153    ///
154    /// use etcd::{Client, kv};
155    ///
156    /// fn main() {
157    ///     let mut ca_cert_file = File::open("ca.der").unwrap();
158    ///     let mut ca_cert_buffer = Vec::new();
159    ///     ca_cert_file.read_to_end(&mut ca_cert_buffer).unwrap();
160    ///
161    ///     let mut pkcs12_file = File::open("/source/tests/ssl/client.p12").unwrap();
162    ///     let mut pkcs12_buffer = Vec::new();
163    ///     pkcs12_file.read_to_end(&mut pkcs12_buffer).unwrap();
164    ///
165    ///     let mut builder = TlsConnector::builder();
166    ///     builder.add_root_certificate(Certificate::from_der(&ca_cert_buffer).unwrap());
167    ///     builder.identity(Identity::from_pkcs12(&pkcs12_buffer, "secret").unwrap());
168    ///
169    ///     let tls_connector = builder.build().unwrap();
170    ///
171    ///     let mut http_connector = HttpConnector::new(4);
172    ///     http_connector.enforce_http(false);
173    ///     let https_connector = HttpsConnector::from((http_connector, tls_connector));
174    ///
175    ///     let hyper = hyper::Client::builder().build(https_connector);
176    ///
177    ///     let client = Client::custom(hyper, &["https://etcd.example.com:2379"], None).unwrap();
178    ///
179    ///     let work = kv::set(&client, "/foo", "bar", None).and_then(move |_| {
180    ///         let get_request = kv::get(&client, "/foo", kv::GetOptions::default());
181    ///
182    ///         get_request.and_then(|response| {
183    ///             let value = response.data.node.value.unwrap();
184    ///
185    ///             assert_eq!(value, "bar".to_string());
186    ///
187    ///             Ok(())
188    ///         })
189    ///     });
190    ///
191    ///     assert!(Runtime::new().unwrap().block_on(work).is_ok());
192    /// }
193    /// ```
194    pub fn custom(
195        hyper: Hyper<C>,
196        endpoints: &[&str],
197        basic_auth: Option<BasicAuth>,
198    ) -> Result<Client<C>, Error> {
199        if endpoints.len() < 1 {
200            return Err(Error::NoEndpoints);
201        }
202
203        let mut uri_endpoints = Vec::with_capacity(endpoints.len());
204
205        for endpoint in endpoints {
206            uri_endpoints.push(endpoint.parse()?);
207        }
208
209        Ok(Client {
210            endpoints: uri_endpoints,
211            http_client: HttpClient::new(hyper, basic_auth),
212        })
213    }
214
215    /// Lets other internal code access the `HttpClient`.
216    pub(crate) fn http_client(&self) -> &HttpClient<C> {
217        &self.http_client
218    }
219
220    /// Lets other internal code access the cluster endpoints.
221    pub(crate) fn endpoints(&self) -> &[Uri] {
222        &self.endpoints
223    }
224
225    /// Runs a basic health check against each etcd member.
226    pub fn health(&self) -> impl Stream<Item = Response<Health>, Error = Error> + Send {
227        let futures = self.endpoints.iter().map(|endpoint| {
228            let url = build_url(&endpoint, "health");
229            let uri = url.parse().map_err(Error::from).into_future();
230            let cloned_client = self.http_client.clone();
231            let response = uri.and_then(move |uri| cloned_client.get(uri).map_err(Error::from));
232            response.and_then(|response| {
233                let status = response.status();
234                let cluster_info = ClusterInfo::from(response.headers());
235                let body = response.into_body().concat2().map_err(Error::from);
236
237                body.and_then(move |ref body| {
238                    if status == StatusCode::OK {
239                        match serde_json::from_slice::<Health>(body) {
240                            Ok(data) => Ok(Response { data, cluster_info }),
241                            Err(error) => Err(Error::Serialization(error)),
242                        }
243                    } else {
244                        match serde_json::from_slice::<ApiError>(body) {
245                            Ok(error) => Err(Error::Api(error)),
246                            Err(error) => Err(Error::Serialization(error)),
247                        }
248                    }
249                })
250            })
251        });
252
253        futures_unordered(futures)
254    }
255
256    /// Returns version information from each etcd cluster member the client was initialized with.
257    pub fn versions(&self) -> impl Stream<Item = Response<VersionInfo>, Error = Error> + Send {
258        let futures = self.endpoints.iter().map(|endpoint| {
259            let url = build_url(&endpoint, "version");
260            let uri = url.parse().map_err(Error::from).into_future();
261            let cloned_client = self.http_client.clone();
262            let response = uri.and_then(move |uri| cloned_client.get(uri).map_err(Error::from));
263            response.and_then(|response| {
264                let status = response.status();
265                let cluster_info = ClusterInfo::from(response.headers());
266                let body = response.into_body().concat2().map_err(Error::from);
267
268                body.and_then(move |ref body| {
269                    if status == StatusCode::OK {
270                        match serde_json::from_slice::<VersionInfo>(body) {
271                            Ok(data) => Ok(Response { data, cluster_info }),
272                            Err(error) => Err(Error::Serialization(error)),
273                        }
274                    } else {
275                        match serde_json::from_slice::<ApiError>(body) {
276                            Ok(error) => Err(Error::Api(error)),
277                            Err(error) => Err(Error::Serialization(error)),
278                        }
279                    }
280                })
281            })
282        });
283
284        futures_unordered(futures)
285    }
286
287    /// Lets other internal code make basic HTTP requests.
288    pub(crate) fn request<U, T>(
289        &self,
290        uri: U,
291    ) -> impl Future<Item = Response<T>, Error = Error> + Send
292    where
293        U: Future<Item = Uri, Error = Error> + Send,
294        T: DeserializeOwned + Send + 'static,
295    {
296        let http_client = self.http_client.clone();
297        let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
298        response.and_then(|response| {
299            let status = response.status();
300            let cluster_info = ClusterInfo::from(response.headers());
301            let body = response.into_body().concat2().map_err(Error::from);
302
303            body.and_then(move |body| {
304                if status == StatusCode::OK {
305                    match serde_json::from_slice::<T>(&body) {
306                        Ok(data) => Ok(Response { data, cluster_info }),
307                        Err(error) => Err(Error::Serialization(error)),
308                    }
309                } else {
310                    match serde_json::from_slice::<ApiError>(&body) {
311                        Ok(error) => Err(Error::Api(error)),
312                        Err(error) => Err(Error::Serialization(error)),
313                    }
314                }
315            })
316        })
317    }
318}
319
320/// A wrapper type returned by all API calls.
321///
322/// Contains the primary data of the response along with information about the cluster extracted
323/// from the HTTP response headers.
324#[derive(Clone, Debug)]
325pub struct Response<T> {
326    /// Information about the state of the cluster.
327    pub cluster_info: ClusterInfo,
328    /// The primary data of the response.
329    pub data: T,
330}
331
332/// Information about the state of the etcd cluster from an API response's HTTP headers.
333#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
334pub struct ClusterInfo {
335    /// An internal identifier for the cluster.
336    pub cluster_id: Option<String>,
337    /// A unique, monotonically-incrementing integer created for each change to etcd.
338    pub etcd_index: Option<u64>,
339    /// A unique, monotonically-incrementing integer used by the Raft protocol.
340    pub raft_index: Option<u64>,
341    /// The current Raft election term.
342    pub raft_term: Option<u64>,
343}
344
345impl<'a> From<&'a HeaderMap<HeaderValue>> for ClusterInfo {
346    fn from(headers: &'a HeaderMap<HeaderValue>) -> Self {
347        let cluster_id = headers.get(XETCD_CLUSTER_ID).and_then(|v| {
348            match String::from_utf8(v.as_bytes().to_vec()) {
349                Ok(s) => Some(s),
350                Err(e) => {
351                    error!("{} header decode error: {:?}", XETCD_CLUSTER_ID, e);
352                    None
353                }
354            }
355        });
356
357        let etcd_index = headers.get(XETCD_INDEX).and_then(|v| {
358            match String::from_utf8(v.as_bytes().to_vec())
359                .map_err(|e| format!("{:?}", e))
360                .and_then(|s| s.parse().map_err(|e| format!("{:?}", e)))
361            {
362                Ok(i) => Some(i),
363                Err(e) => {
364                    error!("{} header decode error: {}", XETCD_INDEX, e);
365                    None
366                }
367            }
368        });
369
370        let raft_index = headers.get(XRAFT_INDEX).and_then(|v| {
371            match String::from_utf8(v.as_bytes().to_vec())
372                .map_err(|e| format!("{:?}", e))
373                .and_then(|s| s.parse().map_err(|e| format!("{:?}", e)))
374            {
375                Ok(i) => Some(i),
376                Err(e) => {
377                    error!("{} header decode error: {}", XRAFT_INDEX, e);
378                    None
379                }
380            }
381        });
382
383        let raft_term = headers.get(XRAFT_TERM).and_then(|v| {
384            match String::from_utf8(v.as_bytes().to_vec())
385                .map_err(|e| format!("{:?}", e))
386                .and_then(|s| s.parse().map_err(|e| format!("{:?}", e)))
387            {
388                Ok(i) => Some(i),
389                Err(e) => {
390                    error!("{} header decode error: {}", XRAFT_TERM, e);
391                    None
392                }
393            }
394        });
395
396        ClusterInfo {
397            cluster_id: cluster_id,
398            etcd_index: etcd_index,
399            raft_index: raft_index,
400            raft_term: raft_term,
401        }
402    }
403}
404
405/// Constructs the full URL for the versions API call.
406fn build_url(endpoint: &Uri, path: &str) -> String {
407    format!("{}{}", endpoint, path)
408}